You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "dixingxing85@163.com" <di...@163.com> on 2020/06/11 12:21:18 UTC

如何做checkpoint的灾备

Hi Flink社区,
目前我们在调研checkpoint 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?



Best,
Xingxing Di

Re: Re: 如何做checkpoint的灾备

Posted by "dixingxing85@163.com" <di...@163.com>.
@Congxian  感谢你的回复,我们会参考你的思路。



Best,
Xingxing Di
 
Sender: Congxian Qiu
Send Time: 2020-06-15 09:55
Receiver: user-zh
cc: zhangyingchen; pengxingbo
Subject: Re: Re: 如何做checkpoint的灾备
正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有
FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。
 
对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta
统一存储到某个地方,这样后续直接从这个地方读取即可。
 
Best,
Congxian
 
 
dixingxing85@163.com <di...@163.com> 于2020年6月15日周一 上午12:06写道:
 
> @Congxian Qiu 感谢你的回复!
> 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11
> 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
> 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
> 3.checkpoint双写:
> 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
>
> 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
> 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
> @唐云  感谢你的恢复!
> 我们试一下你说的方式,感谢。
>
> 题外话,我还有个疑问,就是如何判断checkpoint是否可用?
>
> 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
>
> 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
> PS:我们用的是flink 1.9.2
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Yun Tang
> 发送时间: 2020-06-14 00:48
> 收件人: user-zh
> 主题: Re: 如何做checkpoint的灾备
> Hi Xingxing
>
> 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1]
> 来忽略失败的拷贝(例如FileNotFoundException)
> 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
>
> 祝好
> 唐云
>
> ________________________________
> From: Congxian Qiu <qc...@gmail.com>
> Sent: Saturday, June 13, 2020 16:54
> To: user-zh <us...@flink.apache.org>
> Subject: Re: 如何做checkpoint的灾备
>
> Hi
>
> 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
> 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
>
> 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
> 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
> checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
> 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
> 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
>
> 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
> savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
> checkpoint 慢很多
>
> [1]
>
> https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
> [2] https://issues.apache.org/jira/browse/FLINK-5763
>
> Best,
> Congxian
>
>
> dixingxing85@163.com <di...@163.com> 于2020年6月11日周四 下午8:21写道:
>
> > Hi Flink社区,
> > 目前我们在调研checkpoint
> > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
> >
> >
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>

Re: Re: 如何做checkpoint的灾备

Posted by Congxian Qiu <qc...@gmail.com>.
正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有
FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。

对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta
统一存储到某个地方,这样后续直接从这个地方读取即可。

Best,
Congxian


dixingxing85@163.com <di...@163.com> 于2020年6月15日周一 上午12:06写道:

> @Congxian Qiu 感谢你的回复!
> 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11
> 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
> 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
> 3.checkpoint双写:
> 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
>
> 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
> 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
> @唐云  感谢你的恢复!
> 我们试一下你说的方式,感谢。
>
> 题外话,我还有个疑问,就是如何判断checkpoint是否可用?
>
> 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
>
> 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
> PS:我们用的是flink 1.9.2
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Yun Tang
> 发送时间: 2020-06-14 00:48
> 收件人: user-zh
> 主题: Re: 如何做checkpoint的灾备
> Hi Xingxing
>
> 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1]
> 来忽略失败的拷贝(例如FileNotFoundException)
> 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
>
> 祝好
> 唐云
>
> ________________________________
> From: Congxian Qiu <qc...@gmail.com>
> Sent: Saturday, June 13, 2020 16:54
> To: user-zh <us...@flink.apache.org>
> Subject: Re: 如何做checkpoint的灾备
>
> Hi
>
> 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
> 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
>
> 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
> 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
> checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
> 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
> 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
>
> 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
> savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
> checkpoint 慢很多
>
> [1]
>
> https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
> [2] https://issues.apache.org/jira/browse/FLINK-5763
>
> Best,
> Congxian
>
>
> dixingxing85@163.com <di...@163.com> 于2020年6月11日周四 下午8:21写道:
>
> > Hi Flink社区,
> > 目前我们在调研checkpoint
> > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
> >
> >
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>

回复: Re: 如何做checkpoint的灾备

Posted by "dixingxing85@163.com" <di...@163.com>.
@Congxian Qiu 感谢你的回复!
1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
3.checkpoint双写:
感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
@唐云  感谢你的恢复!
我们试一下你说的方式,感谢。

题外话,我还有个疑问,就是如何判断checkpoint是否可用?
我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
PS:我们用的是flink 1.9.2 



Best,
Xingxing Di
 
发件人: Yun Tang
发送时间: 2020-06-14 00:48
收件人: user-zh
主题: Re: 如何做checkpoint的灾备
Hi Xingxing
 
由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 来忽略失败的拷贝(例如FileNotFoundException) 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
 
[1] https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
 
祝好
唐云
 
________________________________
From: Congxian Qiu <qc...@gmail.com>
Sent: Saturday, June 13, 2020 16:54
To: user-zh <us...@flink.apache.org>
Subject: Re: 如何做checkpoint的灾备
 
Hi
 
你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
 
另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
 
另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
checkpoint 慢很多
 
[1]
https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
[2] https://issues.apache.org/jira/browse/FLINK-5763
 
Best,
Congxian
 
 
dixingxing85@163.com <di...@163.com> 于2020年6月11日周四 下午8:21写道:
 
> Hi Flink社区,
> 目前我们在调研checkpoint
> 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
>
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
>
>
>
> Best,
> Xingxing Di
>

Re: 如何做checkpoint的灾备

Posted by Yun Tang <my...@live.com>.
Hi Xingxing

由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 来忽略失败的拷贝(例如FileNotFoundException) 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。

[1] https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options

祝好
唐云

________________________________
From: Congxian Qiu <qc...@gmail.com>
Sent: Saturday, June 13, 2020 16:54
To: user-zh <us...@flink.apache.org>
Subject: Re: 如何做checkpoint的灾备

Hi

你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。

另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。

另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
checkpoint 慢很多

[1]
https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
[2] https://issues.apache.org/jira/browse/FLINK-5763

Best,
Congxian


dixingxing85@163.com <di...@163.com> 于2020年6月11日周四 下午8:21写道:

> Hi Flink社区,
> 目前我们在调研checkpoint
> 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
>
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
>
>
>
> Best,
> Xingxing Di
>

Re: 如何做checkpoint的灾备

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。

另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。

另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
checkpoint 慢很多

[1]
https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
[2] https://issues.apache.org/jira/browse/FLINK-5763

Best,
Congxian


dixingxing85@163.com <di...@163.com> 于2020年6月11日周四 下午8:21写道:

> Hi Flink社区,
> 目前我们在调研checkpoint
> 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
>
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
>
>
>
> Best,
> Xingxing Di
>