You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chrisr123 <ch...@gmail.com> on 2018/10/18 01:51:11 UTC

Restart from checkpoint after program failure

Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after
a program failure (restart strategies tried but exhausted), but I'm not
picking up the restored state. What am I doing wrong here?  

*Summary*
I'm using a very simple app on 1 node just to learn checkpointing.
App reads from a socket stream and I deliberately send in some "bad" data to
throw an Exception using netcat (nc) as source. App uses  a simple file URL
as checkpoint backend. 

*Checkpoint Backend*
// specified in program:
env.setStateBackend((StateBackend)new
FsStateBackend("file:///home/hadoop/flink/checkpoints/"));

For restart strategy, I specify 3 attempts with 5 second delay between
attempts
// specified in program:
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));

*Checkpoint Backend*
*App Logic:*
All the application does is parse each line as key,Integer pair and outputs
accumulated sum to stdout. (See below)
If I start up nc -l 9999 and type values like this it works fine:
key1,5
key1,3
key1,4

However if I type in "junk" the program throws Exception trying to parse
'junk' as an Integer 
key1,junk

When the application fails, nc also stops. If I start nc before all 3
restart attempts have been tried, everything is fine and the program
restarts, picking up state where it left off.

So after all the restarts have been tried and failed, I want to restart my
program manually and pick up where I left off. Since I am specifying
checkpoint backend in program , I thought it would just pick it up from
there. Then I tried passing in the backend using the -s parameter to my
program but that doesnot work either:

flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints 




*App Source:*
public class ComputeSumFaultTolerant {

	public static void main(String[] args) throws Exception {		

		// Execution Environment
		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		ParameterTool parms = ParameterTool.fromArgs(args);
		env.getConfig().setGlobalJobParameters(parms);
		String host = "localhost";
		int port = 9999;
		
		System.out.println("ComputeSumFaultTolerant BEGIN");
	
		// Setup Checkpoint and Retry
		String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
		Utils.configureCheckpoint(env,checkpointBackendURL);
		Utils.configureRestartFixedDelay(env);

		// Get Our Raw Data Stream
		DataStream<Tuple2&lt;String,Long>> eventStream = env
				.socketTextStream(host, port)
				.map(new MessageParser())
				.keyBy(0)
				.sum(1);
		eventStream.print();
		
		// Execute
		env.execute("ComputeSumFaultTolerant");
	}
	
	private static class MessageParser implements
MapFunction<String,Tuple2&lt;String,Long>> {
		public Tuple2<String,Long> map(String input) throws Exception {
			String[] tokens = input.toLowerCase().split(",");
			String key = tokens[0];
			Long value = Long.valueOf(tokens[1]);
			return new Tuple2<String,Long>(key,value);
		}
	}
	
	
}

public class Utils

	public static void configureCheckpoint(StreamExecutionEnvironment env,
String checkpointBackend) throws Exception {
		// Set Up Checkpoints
		env.enableCheckpointing(5000L);
		
		// set mode to exactly-once (this is the default)
	
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		
		// allow only one checkpoint to be in progress at the same time
		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

		// make sure 500 ms of progress happen between checkpoints
		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);

		// checkpoints have to complete within one minute, or are discarded
		env.getCheckpointConfig().setCheckpointTimeout(10000);

		// allow only one checkpoint to be in progress at the same time
		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

		// Checkpoint Back-end
		env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
		
		System.out.println("CHECKPOINT IS EXTERNALIZED");
	
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		
		System.out.println("External enabled=" +
env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
		
	}
	
	public static void configureRestart(StreamExecutionEnvironment env) throws
Exception {
		
		// Restart Strategy
		// Fixed Delay
		int restartAttempts = 3;
		int restartDelaySeconds = 5;
		long delayBetweenRestarts = restartDelaySeconds*1000;
	
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));
		
		// Failure Rate Restart
		int failureRate = 3;   
		Time failureInterval = Time.of(5, TimeUnit.MINUTES);
		Time delayInterval = Time.of(5, TimeUnit.SECONDS);
		//
env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
failureInterval, delayInterval));
		
		// No Restart
		// env.setRestartStrategy(RestartStrategies.noRestart());
	}

}	









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Restart from checkpoint after program failure

Posted by Paul Lam <pa...@gmail.com>.
Hi,

I think you need to specify the directory of an concrete checkpoint instead of the root directory for checkpoints to restore the states. The directory name should be like chk-${id}.

The job id will change if you re-submit the job, so jobmanager is not able to recognize the retained checkpoint of the previous submission although you are using the same checkpoint root dir.

Best,
Paul Lam

> 在 2018年10月18日,09:51,chrisr123 <ch...@gmail.com> 写道:
> 
> Hi Folks,
> I'm trying to restart my program with restored state from a checkpoint after
> a program failure (restart strategies tried but exhausted), but I'm not
> picking up the restored state. What am I doing wrong here?  
> 
> *Summary*
> I'm using a very simple app on 1 node just to learn checkpointing.
> App reads from a socket stream and I deliberately send in some "bad" data to
> throw an Exception using netcat (nc) as source. App uses  a simple file URL
> as checkpoint backend. 
> 
> *Checkpoint Backend*
> // specified in program:
> env.setStateBackend((StateBackend)new
> FsStateBackend("file:///home/hadoop/flink/checkpoints/"));
> 
> For restart strategy, I specify 3 attempts with 5 second delay between
> attempts
> // specified in program:
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
> 
> *Checkpoint Backend*
> *App Logic:*
> All the application does is parse each line as key,Integer pair and outputs
> accumulated sum to stdout. (See below)
> If I start up nc -l 9999 and type values like this it works fine:
> key1,5
> key1,3
> key1,4
> 
> However if I type in "junk" the program throws Exception trying to parse
> 'junk' as an Integer 
> key1,junk
> 
> When the application fails, nc also stops. If I start nc before all 3
> restart attempts have been tried, everything is fine and the program
> restarts, picking up state where it left off.
> 
> So after all the restarts have been tried and failed, I want to restart my
> program manually and pick up where I left off. Since I am specifying
> checkpoint backend in program , I thought it would just pick it up from
> there. Then I tried passing in the backend using the -s parameter to my
> program but that doesnot work either:
> 
> flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints 
> 
> 
> 
> 
> *App Source:*
> public class ComputeSumFaultTolerant {
> 
> 	public static void main(String[] args) throws Exception {		
> 
> 		// Execution Environment
> 		StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> 		ParameterTool parms = ParameterTool.fromArgs(args);
> 		env.getConfig().setGlobalJobParameters(parms);
> 		String host = "localhost";
> 		int port = 9999;
> 		
> 		System.out.println("ComputeSumFaultTolerant BEGIN");
> 	
> 		// Setup Checkpoint and Retry
> 		String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/";
> 		Utils.configureCheckpoint(env,checkpointBackendURL);
> 		Utils.configureRestartFixedDelay(env);
> 
> 		// Get Our Raw Data Stream
> 		DataStream<Tuple2&lt;String,Long>> eventStream = env
> 				.socketTextStream(host, port)
> 				.map(new MessageParser())
> 				.keyBy(0)
> 				.sum(1);
> 		eventStream.print();
> 		
> 		// Execute
> 		env.execute("ComputeSumFaultTolerant");
> 	}
> 	
> 	private static class MessageParser implements
> MapFunction<String,Tuple2&lt;String,Long>> {
> 		public Tuple2<String,Long> map(String input) throws Exception {
> 			String[] tokens = input.toLowerCase().split(",");
> 			String key = tokens[0];
> 			Long value = Long.valueOf(tokens[1]);
> 			return new Tuple2<String,Long>(key,value);
> 		}
> 	}
> 	
> 	
> }
> 
> public class Utils
> 
> 	public static void configureCheckpoint(StreamExecutionEnvironment env,
> String checkpointBackend) throws Exception {
> 		// Set Up Checkpoints
> 		env.enableCheckpointing(5000L);
> 		
> 		// set mode to exactly-once (this is the default)
> 	
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> 		
> 		// allow only one checkpoint to be in progress at the same time
> 		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 		// make sure 500 ms of progress happen between checkpoints
> 		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> 
> 		// checkpoints have to complete within one minute, or are discarded
> 		env.getCheckpointConfig().setCheckpointTimeout(10000);
> 
> 		// allow only one checkpoint to be in progress at the same time
> 		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 		// Checkpoint Back-end
> 		env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend));
> 		
> 		System.out.println("CHECKPOINT IS EXTERNALIZED");
> 	
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 		
> 		System.out.println("External enabled=" +
> env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
> 		
> 	}
> 	
> 	public static void configureRestart(StreamExecutionEnvironment env) throws
> Exception {
> 		
> 		// Restart Strategy
> 		// Fixed Delay
> 		int restartAttempts = 3;
> 		int restartDelaySeconds = 5;
> 		long delayBetweenRestarts = restartDelaySeconds*1000;
> 	
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
> 		
> 		// Failure Rate Restart
> 		int failureRate = 3;   
> 		Time failureInterval = Time.of(5, TimeUnit.MINUTES);
> 		Time delayInterval = Time.of(5, TimeUnit.SECONDS);
> 		//
> env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
> failureInterval, delayInterval));
> 		
> 		// No Restart
> 		// env.setRestartStrategy(RestartStrategies.noRestart());
> 	}
> 
> }	
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/