You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vino yang <ya...@gmail.com> on 2018/08/24 10:55:00 UTC

Re: Can I only use checkpoints instead of savepoints in production?

Hi Henry,

A good answer from stackoverflow:

Apache Flink's Checkpoints and Savepoints are similar in that way they both
are mechanisms for preserving internal state of Flink's applications.

Checkpoints are taken automatically and are used for automatic restarting
job in case of a failure.

Savepoints on the other hand are taken manually, are always stored
externally and are used for starting a "new" job with previous internal
state in case of e.g.


   - Bug fixing
   - Flink version upgrade
   - A/B testing, etc.

Underneath they are in fact the same mechanism/code path with some subtle
nuances.

About your question:

1) No problem, The main purpose of checkpoint itself is to automatically
restart the recovery when the job fails.
2) You can also use REST client to trigger savepoint.
3) I don't know, But it seems that their usage scenarios and purposes are
still different. May Till and Chesnay can answer this question.

Thanks, vino.

徐涛 <ha...@gmail.com> 于2018年8月24日周五 下午3:19写道:

> Hi All,
> I check the documentation of Flink release 1.6, find that I can use
> checkpoints to resume the program either. As I encountered some problems
> when using savepoints, I have the following questions:
> 1. Can I use checkpoints only, but not use savepoints, because it can also
> use to resume programs. If I do so, is there any problem?
> 2. Checkpoint can be generated automatically, but savepoints seems can
> only be generated manually. I have to write a crontab to generate the
> savepoint, more than this, my Flink program is run on Yarn, and on the
> machines, only the Hadoop and Yarn are installed, so I can not use flink
> savepoint command to generate savepoint, and I have no authorization to
> install Flink on the machines.
> 3. Will checkpoint and savepoint merged in later releases?
> Thanks very much.
>
> Best,
> Henry
>

Re: Can I only use checkpoints instead of savepoints in production?

Posted by Andrey Zagrebin <an...@data-artisans.com>.
This thread is also useful in this context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/difference-between-checkpoints-amp-savepoints-td14787.html>

> On 24 Aug 2018, at 14:49, Andrey Zagrebin <an...@data-artisans.com> wrote:
> 
> Hi Henry,
> 
> In addition to Vino’s answer, there are several things to keep in mind about “checkpoints vs savepoints".
> 
> Checkpoints are designed mostly for fault tolerance of running Flink job and automatic recovery
> that is why by default Flink manages their storage itself. Though it is correct that you can configure the checkpoints to be retained (externalised), have control over their storage and resume a failed/canceled job from them.
> 
> But their format might be optimised for any of new Flink versions and change between them.
> It means that in general you might not be able to upgrade Flink version or the running job structure using only checkpoints.
> 
> Moreover, currently, it is not guaranteed that you will be always able to rescale your job from the checkpoint (change parallelism). Although, it is technically possible for Flink 1.6.0 at the moment, even for incremental checkpoints.
> 
> Savepoints are designed for manual intervention of the user for maintenance operations
> that is why their storage is under control of the user in the first place. They have more stable internal format which allows manual migration between Flink or job versions and rescaling.
> 
> Cheers,
> Andrey
> 
>> On 24 Aug 2018, at 12:55, vino yang <yanghua1127@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Henry,
>> 
>> A good answer from stackoverflow:
>> 
>> Apache Flink's Checkpoints and Savepoints are similar in that way they both are mechanisms for preserving internal state of Flink's applications.
>> 
>> Checkpoints are taken automatically and are used for automatic restarting job in case of a failure.
>> 
>> Savepoints on the other hand are taken manually, are always stored externally and are used for starting a "new" job with previous internal state in case of e.g.
>> 
>> Bug fixing
>> Flink version upgrade
>> A/B testing, etc.
>> Underneath they are in fact the same mechanism/code path with some subtle nuances.
>> 
>> About your question: 
>> 
>> 1) No problem, The main purpose of checkpoint itself is to automatically restart the recovery when the job fails.
>> 2) You can also use REST client to trigger savepoint.
>> 3) I don't know, But it seems that their usage scenarios and purposes are still different. May Till and Chesnay can answer this question.
>> 
>> Thanks, vino.
>> 
>> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年8月24日周五 下午3:19写道:
>> Hi All,
>> 	I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
>> 	1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
>> 	2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
>> 	3. Will checkpoint and savepoint merged in later releases?
>> 	Thanks very much.
>> 
>> Best,
>> Henry
> 


Re: Can I only use checkpoints instead of savepoints in production?

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi Henry,

In addition to Vino’s answer, there are several things to keep in mind about “checkpoints vs savepoints".

Checkpoints are designed mostly for fault tolerance of running Flink job and automatic recovery
that is why by default Flink manages their storage itself. Though it is correct that you can configure the checkpoints to be retained (externalised), have control over their storage and resume a failed/canceled job from them.

But their format might be optimised for any of new Flink versions and change between them.
It means that in general you might not be able to upgrade Flink version or the running job structure using only checkpoints.

Moreover, currently, it is not guaranteed that you will be always able to rescale your job from the checkpoint (change parallelism). Although, it is technically possible for Flink 1.6.0 at the moment, even for incremental checkpoints.

Savepoints are designed for manual intervention of the user for maintenance operations
that is why their storage is under control of the user in the first place. They have more stable internal format which allows manual migration between Flink or job versions and rescaling.

Cheers,
Andrey

> On 24 Aug 2018, at 12:55, vino yang <ya...@gmail.com> wrote:
> 
> Hi Henry,
> 
> A good answer from stackoverflow:
> 
> Apache Flink's Checkpoints and Savepoints are similar in that way they both are mechanisms for preserving internal state of Flink's applications.
> 
> Checkpoints are taken automatically and are used for automatic restarting job in case of a failure.
> 
> Savepoints on the other hand are taken manually, are always stored externally and are used for starting a "new" job with previous internal state in case of e.g.
> 
> Bug fixing
> Flink version upgrade
> A/B testing, etc.
> Underneath they are in fact the same mechanism/code path with some subtle nuances.
> 
> About your question: 
> 
> 1) No problem, The main purpose of checkpoint itself is to automatically restart the recovery when the job fails.
> 2) You can also use REST client to trigger savepoint.
> 3) I don't know, But it seems that their usage scenarios and purposes are still different. May Till and Chesnay can answer this question.
> 
> Thanks, vino.
> 
> 徐涛 <happydexutao@gmail.com <ma...@gmail.com>> 于2018年8月24日周五 下午3:19写道:
> Hi All,
> 	I check the documentation of Flink release 1.6, find that I can use checkpoints to resume the program either. As I encountered some problems when using savepoints, I have the following questions:
> 	1. Can I use checkpoints only, but not use savepoints, because it can also use to resume programs. If I do so, is there any problem?
> 	2. Checkpoint can be generated automatically, but savepoints seems can only be generated manually. I have to write a crontab to generate the savepoint, more than this, my Flink program is run on Yarn, and on the machines, only the Hadoop and Yarn are installed, so I can not use flink savepoint command to generate savepoint, and I have no authorization to install Flink on the machines.
> 	3. Will checkpoint and savepoint merged in later releases?
> 	Thanks very much.
> 
> Best,
> Henry


Re: Can I only use checkpoints instead of savepoints in production?

Posted by Averell <lv...@gmail.com>.
Thank you, Vino.
I found it, http://<clusterIP>:8088/ 

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can I only use checkpoints instead of savepoints in production?

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

I have not used aws products, but if it is similar to YARN, or if you have
visited YARN's web ui.
Then you look at the YARN ApplicationMaster log to view the JM log, and the
container log is the tm log.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月27日周一 下午4:09写道:

> Hi Vino,
>
> Could you please tell where I should find the JM and TM logs? I'm running
> on
> an AWS EMR using yarn.
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Can I only use checkpoints instead of savepoints in production?

Posted by Averell <lv...@gmail.com>.
Hi Vino,

Could you please tell where I should find the JM and TM logs? I'm running on
an AWS EMR using yarn.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can I only use checkpoints instead of savepoints in production?

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

This problem is caused by a heartbeat timeout between JM and TM. You can
locate it by:
1) Check the network status of the node at the time, such as whether the
connection with other systems is equally problematic;
2) Check the tm log to see if there are more specific reasons;
3) View the load condition of the node that generated the timeout period;
4) Confirm whether there is a problem such as Full GC causing the JVM
process to be stuck at the time;

Also, I don't know if you are using the default timeout, and if so, you can
increase it appropriately.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月27日周一 下午3:00写道:

> Thank you Vino.
>
> I put the message in a  tag, and I don't know why it was not shown in the
> email thread. I paste the error message below in this email.
>
> Anyway, it seems that was an issue with enabling checkpointing. Now I am
> able to get it turned on properly, and my job is getting restored
> automatically.
> I am trying to test my scenarios now. Found some issues, and I think it
> would be better to ask in a separate thread.
>
> Thanks and regards,
> Averell
>
> =====
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 457d8f370ef8a50bb462946e1f12b80e)
>         at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>         at
>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>         at
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
> ......
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
>         at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>         at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
> with id container_1535279282999_0032_01_000013 timed out.
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
>         at
>
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Can I only use checkpoints instead of savepoints in production?

Posted by Averell <lv...@gmail.com>.
Thank you Vino. 

I put the message in a  tag, and I don't know why it was not shown in the
email thread. I paste the error message below in this email.

Anyway, it seems that was an issue with enabling checkpointing. Now I am
able to get it turned on properly, and my job is getting restored
automatically.
I am trying to test my scenarios now. Found some issues, and I think it
would be better to ask in a separate thread.

Thanks and regards,
Averell

=====
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 457d8f370ef8a50bb462946e1f12b80e)
	at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
	at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
	at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:661)
......
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
	at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
	at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
	at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
	at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id container_1535279282999_0032_01_000013 timed out.
	at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1610)
	at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can I only use checkpoints instead of savepoints in production?

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

What is the error message? Do you seem to forget to post it?
As far as I know, if you enable checkpoints, it will automatically resume
when the job fails.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月27日周一 下午1:21写道:

> Thank you Vino.
>
> I sometimes got the error message like the one below. It looks like my
> executors got overloaded. Here I have another question: is there any
> existing solution that allows me to have the job restored automatically?
>
> Thanks and best regards,
> Averell
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Can I only use checkpoints instead of savepoints in production?

Posted by Averell <lv...@gmail.com>.
Thank you Vino.

I sometimes got the error message like the one below. It looks like my
executors got overloaded. Here I have another question: is there any
existing solution that allows me to have the job restored automatically?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can I only use checkpoints instead of savepoints in production?

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

The checkpoint is automatically triggered periodically according to the
checkpoint interval set by the user. I believe that you should have no
doubt about this.

There are many reasons for the Job failure.
The technical definition is that the Job does not normally enter the final
termination state.
Here is a document [1] with a transformation map of Job status, you can see
how Flink Job status is converted.
There are many reasons why a job fails.
For example, if a sub task fails or throws an exception, a sub task throws
an exception when doing a checkpoint (but this does not necessarily lead to
a job failure),
Connection timeout between a TM and JM, TM downtime, JM leader switch and
more.

*So, in these scenarios (including your own enumeration) you can simulate
the failure recovery of a job.*

More specifically, Job recovery is based on the child nodes of Zookeeper's
"/jobgraph".
If any job does not enter the termination state normally, the child nodes
of this job will not be cleaned up.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月24日周五 下午9:17写道:

> Hi Vino,
>
> Regarding this statement "/Checkpoints are taken automatically and are used
> for automatic restarting job in case of a failure/", I do not quite
> understand the definition of a failure, and how to simulate that while
> testing my application. Possible scenarios that I can think of:
>    (1) flink application killed
>    (2) cluster crashed
>    (3) one of the servers in the cluster crashed
>    (4) unhandled exception raised when abnormal data received
>    ...
>
> Could you please help explain?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Can I only use checkpoints instead of savepoints in production?

Posted by Averell <lv...@gmail.com>.
Hi Vino,

Regarding this statement "/Checkpoints are taken automatically and are used
for automatic restarting job in case of a failure/", I do not quite
understand the definition of a failure, and how to simulate that while
testing my application. Possible scenarios that I can think of:
   (1) flink application killed
   (2) cluster crashed
   (3) one of the servers in the cluster crashed
   (4) unhandled exception raised when abnormal data received
   ...

Could you please help explain?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/