You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Meghajit Mazumdar <me...@gojek.com> on 2022/09/23 08:46:21 UTC

Flink Operator: Checkpointing and Upgrade Modes

Hello folks,

Me and my team are working on migrating our existing Flink jobs towards a
Native Kubernetes Application Mode deployment using Flink Operator.

We were a bit confused about the relation between upgrade modes provided by
Flink operator and the recovery from checkpoints.

For example, we recently did an experiment where we wrote a FlinkDeployment
CRD with Kafka source, checkpointing enabled and using Kubernetes HA. My
CRD has these configs specified under flinkConfiguration

state.backend: filesystem
state.backend.fs.checkpointdir:
gs://my-bucket/flink/data-kj-flink-operator-flink/flink-checkpoints
state.savepoints.dir:
gs://my-bucket/flink/data-kj-flink-operator-flink/flink-savepoints

high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir:
gs://my-bucket/flink/data-kj-flink-operator-flink/ha


We then configured this CRD with *upgradeMode* as *stateless*

job:
  jarURI: local:///opt/flink/opt/my-app.jar
  parallelism: 2
  upgradeMode: stateless


We then deployed this CRD. Next, we tried killing the job manager and task
manager to simulate a failure scenario.

We saw that upon any job manager or task manager failure, successful state
recovery was happening from the last saved checkpoint and output was
correct.
As per the understanding from the documentation
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades:~:text=Stateless%20application%20upgrades%20from%20empty%20state>
of
Stateless mode, we were expecting that the job would start from the
beginning with no knowledge about previous checkpoints. However, that
doesn't seem to be the case.

[image: Screenshot 2022-09-23 at 2.07.42 PM.png]


Can you please elaborate on how Checkpoints and Checkpoint recovery
mechanisms work with Stateless mode and other modes ?

Thanks.

-- 
*Regards,*
*Meghajit*

Re: Flink Operator: Checkpointing and Upgrade Modes

Posted by Gyula Fóra <gy...@gmail.com>.
Hi!

The upgradeMode only affects how the operator executes upgrades when the
spec changes. It does not affect the Flink clusters/jobs failure recovery
behaviour.

Enabling checkpointing, and setting upgradeMode to stateless will mean that
your job will recover the last state if it fails but when you modify the
spec the operator will start it again from a completely empty state not the
last checkpoint.

In a similar way the Savepoint, Last-state upgrade modes don't affect the
runtime failure recovery behaviour only the upgrade process when the yaml
spec changes.

Cheers,
Gyula

On Fri, Sep 23, 2022 at 10:48 AM Meghajit Mazumdar <
meghajit.mazumdar@gojek.com> wrote:

> Hello folks,
>
> Me and my team are working on migrating our existing Flink jobs towards a
> Native Kubernetes Application Mode deployment using Flink Operator.
>
> We were a bit confused about the relation between upgrade modes provided
> by Flink operator and the recovery from checkpoints.
>
> For example, we recently did an experiment where we wrote a
> FlinkDeployment CRD with Kafka source, checkpointing enabled and using
> Kubernetes HA. My CRD has these configs specified under flinkConfiguration
>
> state.backend: filesystem
> state.backend.fs.checkpointdir: gs://my-bucket/flink/data-kj-flink-operator-flink/flink-checkpoints
> state.savepoints.dir: gs://my-bucket/flink/data-kj-flink-operator-flink/flink-savepoints
>
> high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: gs://my-bucket/flink/data-kj-flink-operator-flink/ha
>
>
> We then configured this CRD with *upgradeMode* as *stateless*
>
> job:
>   jarURI: local:///opt/flink/opt/my-app.jar
>   parallelism: 2
>   upgradeMode: stateless
>
>
> We then deployed this CRD. Next, we tried killing the job manager and task
> manager to simulate a failure scenario.
>
> We saw that upon any job manager or task manager failure, successful state
> recovery was happening from the last saved checkpoint and output was
> correct.
> As per the understanding from the documentation
> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades:~:text=Stateless%20application%20upgrades%20from%20empty%20state> of
> Stateless mode, we were expecting that the job would start from the
> beginning with no knowledge about previous checkpoints. However, that
> doesn't seem to be the case.
>
> [image: Screenshot 2022-09-23 at 2.07.42 PM.png]
>
>
> Can you please elaborate on how Checkpoints and Checkpoint recovery
> mechanisms work with Stateless mode and other modes ?
>
> Thanks.
>
> --
> *Regards,*
> *Meghajit*
>