You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bowen Li (JIRA)" <ji...@apache.org> on 2017/09/01 06:21:00 UTC

[jira] [Updated] (FLINK-7566) if there's only one checkpointing metadata file in , `flink run -s ` should successfully resume from that metadata file

     [ https://issues.apache.org/jira/browse/FLINK-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bowen Li updated FLINK-7566:
----------------------------
    Description: 
Currently, if we want to start a Flink job from a checkpointing file, we have to run `flink run -s <dir>/checkpoint_metadata-xxxxx` by explicitly specifying the checkpoint metadata file name 'checkpoint_metadata-xxxxx'. Since metadata file name always changes, it's not easy to programmatically restart a failed Flink job. The error from jobmanager.log looks like:


{code:java}
2017-08-30 07:25:04,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxxx (22defcf962ff2ac2e7fe99354f5ab168) switched from state FAILING to FAILED.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1396)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot find meta data file in directory s3://xxxx/checkpoints. Please try to load the savepoint directly from the meta data file instead of the directory.
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:262)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
	... 10 more
{code}


What I want is like this: users should be able to start a Flink job by running `flink run -s <dir>` if there's only one checkpointing metadata file in <dir>. If there's none or more than 1 metadata file, the command can fail like it is right now. This way, we can programmatically restart a failed Flink job by hardcoding <dir>.

To achieve that, I think there're two appraches we can do:

1) modify {{CheckpointCoordinator.restoreSavepoint}} to check how many metadata files are in <dir>
2) add another commandline option like '-sd' / '--savepointdirectory' to explicitly load a dir

  was:
Currently, if we want to start a Flink job from a checkpointing file, we have to run `flink run -s <dir>/checkpoint_metadata-xxxxx` by explicitly specifying the checkpoint metadata file name 'checkpoint_metadata-xxxxx'. Since metadata file name always changes, it's not easy to programmatically restart a failed Flink job. The error from jobmanager.log looks like:


{code:java}
2017-08-30 07:25:04,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxxx (22defcf962ff2ac2e7fe99354f5ab168) switched from state FAILING to FAILED.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1396)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot find meta data file in directory s3://xxxx/checkpoints. Please try to load the savepoint directly from the meta data file instead of the directory.
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:262)
	at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69)
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
	... 10 more
{code}


What I want is like this: users should be able to start a Flink job by running `flink run -s <dir>` if there's only one checkpointing metadata file in <dir>. If there's none or more than 1 metadata file, the command can fail like it is right now. This way, we can programmatically restart a failed Flink job by hardcoding <dir>.

To achieve that, I think there're two appraches we can do:

1) modify {{CheckpointCoordinator.restoreSavepoint}} to check how many metadata files are in <dir>
2) add another commandline option like '-sd' (stands for savepointdirectory) to explicitly load a dir


> if there's only one checkpointing metadata file in <dir>, `flink run -s <dir>` should successfully resume from that metadata file 
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7566
>                 URL: https://issues.apache.org/jira/browse/FLINK-7566
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>             Fix For: 1.4.0
>
>
> Currently, if we want to start a Flink job from a checkpointing file, we have to run `flink run -s <dir>/checkpoint_metadata-xxxxx` by explicitly specifying the checkpoint metadata file name 'checkpoint_metadata-xxxxx'. Since metadata file name always changes, it's not easy to programmatically restart a failed Flink job. The error from jobmanager.log looks like:
> {code:java}
> 2017-08-30 07:25:04,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxxx (22defcf962ff2ac2e7fe99354f5ab168) switched from state FAILING to FAILED.
> org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1396)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot find meta data file in directory s3://xxxx/checkpoints. Please try to load the savepoint directly from the meta data file instead of the directory.
> 	at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:262)
> 	at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
> 	... 10 more
> {code}
> What I want is like this: users should be able to start a Flink job by running `flink run -s <dir>` if there's only one checkpointing metadata file in <dir>. If there's none or more than 1 metadata file, the command can fail like it is right now. This way, we can programmatically restart a failed Flink job by hardcoding <dir>.
> To achieve that, I think there're two appraches we can do:
> 1) modify {{CheckpointCoordinator.restoreSavepoint}} to check how many metadata files are in <dir>
> 2) add another commandline option like '-sd' / '--savepointdirectory' to explicitly load a dir



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)