You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sandeep khanzode <sa...@shiftright.ai> on 2021/03/26 17:55:13 UTC
Restore from Checkpoint from local Standalone Job
Hello
I was reading this: https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question
I am trying to run a standalone job on my local with a single job manager and task manager.
I have enabled checkpointing as below:
env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", true));
env.enableCheckpointing(30 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
After I stop my job (I also tried to cancel the job using bin/flink cancel -s /Users/test/savepoint <jobId>), I tried to start the same job using…
./standalone-job.sh start-foreground test.jar --job-id <UUID> --job-classname com.test.MyClass --fromSavepoint /Users/test/savepoint
But it never restores the state, and always starts afresh.
In Flink, I see this:
StandaloneCompletedCheckpointStore
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
public void recover() throws Exception {
// Nothing to do
}
Does this have something to do with not being able to restore state?
Does this need Zookeeper or K8S HA for functioning?
Thanks,
Sandeep
Re: Restore from Checkpoint from local Standalone Job
Posted by Piotr Nowojski <pn...@apache.org>.
Hi Sandeep,
I think it should work fine with `StandaloneCompletedCheckpointStore`.
Have you checked if your directory /Users/test/savepoint is being
populated in the first place? And if so, if the restarted job is not
throwing some exceptions like it can not access those files?
Also note, that cancel with savepoint is deprecated and you should be using
stop with savepoint [1]
Piotrek
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#terminating-a-job
pt., 26 mar 2021 o 18:55 Sandeep khanzode <sa...@shiftright.ai>
napisał(a):
> Hello
>
>
> I was reading this:
> https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question
>
>
> I am trying to run a standalone job on my local with a single job manager
> and task manager.
>
>
>
> I have enabled checkpointing as below:
>
> env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", true));
>
> env.enableCheckpointing(30 * 1000);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
> After I stop my job (I also tried to cancel the job using bin/flink cancel
> -s /Users/test/savepoint <jobId>), I tried to start the same job using…
>
> ./standalone-job.sh start-foreground test.jar --job-id <UUID>
> --job-classname com.test.MyClass --fromSavepoint /Users/test/savepoint
>
>
> But it never restores the state, and always starts afresh.
>
>
> In Flink, I see this:
>
> StandaloneCompletedCheckpointStore
>
> * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
>
> public void recover() throws Exception {
>
> // Nothing to do
> }
>
>
> Does this have something to do with not being able to restore state?
>
> Does this need Zookeeper or K8S HA for functioning?
>
>
> Thanks,
> Sandeep
>
>