You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ivan <em...@gmail.com> on 2017/07/26 14:15:51 UTC

is there ways to enable checkpoint from flink-conf.yaml?

Hi , Flink users

we are using Flink as the runtime of our beam jobs which works great, 
recently we want to enable restart strategy 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
in our flink cluster, from the document I see restart strategy will only 
work when checkpointing is enabled. I'm trying to find out if it's 
possible to enable checkpointing from flink-conf.yaml which is 
equivalent to the call " 
flinkStreamEnv.enableCheckpointing(checkpointInterval);" in 
StreamExecutionEnvironment.

the reason we want to config it through flink-conf.yaml is that we use 
helm to create flink cluster ondemand for dedicated job which works 
great on kubernetes env. with beam if we want to enable checkpointing , 
we have to create FlinkPipelineOptions which is cross platform. (like  
you use Hibernate Session in JPA code). so we are trying to find a way 
to enable it from flink-conf.yaml .

sample flink-conf.yaml as below.

   flink-conf.yaml: |
     blob.server.port: 6124
     jobmanager.rpc.address: address-cache-flink-jobmanager
     jobmanager.rpc.port: 6123
     jobmanager.heap.mb: 256
     taskmanager.heap.mb: 756
     taskmanager.numberOfTaskSlots: 4
     parallelism.default: 16
     metrics.reporters: prom
     metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
     metrics.reporter.prom.port: 9100-9101
     metrics.reporter.prom.prefix: flink_jm_
     restart-strategy: fixed-delay
     restart-strategy.fixed-delay.attempts: 3
     restart-strategy.fixed-delay.delay: 300 s
     state.backend: filesystem
     state.backend.fs.checkpointdir: 
file:///var/nfs/ephem_store/flink/checkpoints


Re: is there ways to enable checkpoint from flink-conf.yaml?

Posted by Ivan <em...@gmail.com>.
one more thing I found is a little confusing for Restart Strategies 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
document is


/The default restart strategy is set via Flink’s configuration file 
flink-conf.yaml. The configuration parameter restart-strategy defines 
which strategy is taken.If checkpointing is not enabled, the “no 
restart” strategy is used. If checkpointing is activated and the restart 
strategy has not been configured, the fixed-delay strategy is used with 
Integer.MAX_VALUE restart attempts. See the following list of available 
restart strategies to learn what values are supported./

I think the document marked as blue will only take effect when no 
restart strategy is defined in flink-conf.yaml. it's quite easy to 
misunderstand that the restart strategies is only enabled when 
checkpointing is enabled, otherwise "no restart" strategy is used.


Thanks

On 26/07/2017 10:32 PM, Chesnay Schepler wrote:
> There is no option that enables checkpointing for all jobs.
>
> If you have control over///all/ jobs, as a *hack*, you could load the 
> configuration manually (I don't think it is exposed through the 
> execution environment) using 
> "GlobalConfiguration.loadConfiguration()", manually check it for 
> whatever setting you put in the config,
> and enable checkpointing based on that.
>
> Note that we discourage the use of the GlobalConfiguration in general, 
> and the above may not work anymore in an upcoming version.
>
> On 26.07.2017 16:15, Ivan wrote:
>>
>> Hi , Flink users
>>
>> we are using Flink as the runtime of our beam jobs which works great, 
>> recently we want to enable restart strategy 
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
>> in our flink cluster, from the document I see restart strategy will 
>> only work when checkpointing is enabled. I'm trying to find out if 
>> it's possible to enable checkpointing from flink-conf.yaml which is 
>> equivalent to the call " 
>> flinkStreamEnv.enableCheckpointing(checkpointInterval);" in 
>> StreamExecutionEnvironment.
>>
>> the reason we want to config it through flink-conf.yaml is that we 
>> use helm to create flink cluster ondemand for dedicated job which 
>> works great on kubernetes env. with beam if we want to enable 
>> checkpointing , we have to create FlinkPipelineOptions which is cross 
>> platform. (like  you use Hibernate Session in JPA code). so we are 
>> trying to find a way to enable it from flink-conf.yaml .
>>
>> sample flink-conf.yaml as below.
>>
>>   flink-conf.yaml: |
>>     blob.server.port: 6124
>>     jobmanager.rpc.address: address-cache-flink-jobmanager
>>     jobmanager.rpc.port: 6123
>>     jobmanager.heap.mb: 256
>>     taskmanager.heap.mb: 756
>>     taskmanager.numberOfTaskSlots: 4
>>     parallelism.default: 16
>>     metrics.reporters: prom
>>     metrics.reporter.prom.class: 
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>     metrics.reporter.prom.port: 9100-9101
>>     metrics.reporter.prom.prefix: flink_jm_
>>     restart-strategy: fixed-delay
>>     restart-strategy.fixed-delay.attempts: 3
>>     restart-strategy.fixed-delay.delay: 300 s
>>     state.backend: filesystem
>>     state.backend.fs.checkpointdir: 
>> file:///var/nfs/ephem_store/flink/checkpoints
>>
>


Re: is there ways to enable checkpoint from flink-conf.yaml?

Posted by Chesnay Schepler <ch...@apache.org>.
There is no option that enables checkpointing for all jobs.

If you have control over///all/ jobs, as a *hack*, you could load the 
configuration manually (I don't think it is exposed through the 
execution environment) using "GlobalConfiguration.loadConfiguration()", 
manually check it for whatever setting you put in the config,
and enable checkpointing based on that.

Note that we discourage the use of the GlobalConfiguration in general, 
and the above may not work anymore in an upcoming version.

On 26.07.2017 16:15, Ivan wrote:
>
> Hi , Flink users
>
> we are using Flink as the runtime of our beam jobs which works great, 
> recently we want to enable restart strategy 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
> in our flink cluster, from the document I see restart strategy will 
> only work when checkpointing is enabled. I'm trying to find out if 
> it's possible to enable checkpointing from flink-conf.yaml which is 
> equivalent to the call " 
> flinkStreamEnv.enableCheckpointing(checkpointInterval);" in 
> StreamExecutionEnvironment.
>
> the reason we want to config it through flink-conf.yaml is that we use 
> helm to create flink cluster ondemand for dedicated job which works 
> great on kubernetes env. with beam if we want to enable checkpointing 
> , we have to create FlinkPipelineOptions which is cross platform. 
> (like  you use Hibernate Session in JPA code). so we are trying to 
> find a way to enable it from flink-conf.yaml .
>
> sample flink-conf.yaml as below.
>
>   flink-conf.yaml: |
>     blob.server.port: 6124
>     jobmanager.rpc.address: address-cache-flink-jobmanager
>     jobmanager.rpc.port: 6123
>     jobmanager.heap.mb: 256
>     taskmanager.heap.mb: 756
>     taskmanager.numberOfTaskSlots: 4
>     parallelism.default: 16
>     metrics.reporters: prom
>     metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
>     metrics.reporter.prom.port: 9100-9101
>     metrics.reporter.prom.prefix: flink_jm_
>     restart-strategy: fixed-delay
>     restart-strategy.fixed-delay.attempts: 3
>     restart-strategy.fixed-delay.delay: 300 s
>     state.backend: filesystem
>     state.backend.fs.checkpointdir: 
> file:///var/nfs/ephem_store/flink/checkpoints
>