You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abdullah Alkawatrah <ak...@gmail.com> on 2022/04/04 21:53:02 UTC

Local recovery with 1.15 snapshot

Hey,
Local recovery introduced in 1.15 seems like a great feature.
Unfortunately, I have not been able to make it work.

I am trying this with a streaming pipeline that consumes events from kafka
topics, and uses rockdb for stateful operations.

*Setup*:

   - Job manager: k8s deployment (ZK HA)
   - Task manager: k8s statefulset
   - Using S3 for checkpoint storage
   - This is a standalone deployment in k8s (not using flink native k8s)

I followed the steps here
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/working_directory/#artifacts-stored-in-the-working-directory>
to enable local recovery, set a resource-id and configure a working
directory pointing to the attached persistent volume. These configs can be
seen below.

*flink-conf.yaml*:
jobmanager.rpc.port: 6565
jobmanager.heap.size: 1024m
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
taskmanager.rpc.port: 6565
taskmanager.data.port: 6124
task.cancellation.interval: 300000
high-availability: zookeeper
high-availability.jobmanager.port: 6565
high-availability.zookeeper.path.root: /flink-pipeline
high-availability.storageDir: s3://flink-state/ha
high-availability.zookeeper.quorum: 196.10.20.10
hive.s3.use-instance-credentials: false
s3.endpoint: s3.amazonaws.com:443
state.backend: rocksdb
state.backend.local-recovery: true
state.backend.rocksdb.localdir: /data/flink/data
process.taskmanager.working-dir: /data/flink
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
state.backend.incremental: true
web.submit.enable: false
akka.ask.timeout: 10 min
restart-strategy.failure-rate.max-failures-per-interval: 20
cluster.evenly-spread-out-slots: true
fs.s3.limit.input: 2
fs.s3.limit.output: 2
slot.idle.timeout: 180000

high-availability.cluster-id: cluster-1
state.checkpoints.dir: s3://flink-state/checkpoints/flink-pipeline
taskmanager.resource-id: flink-pipeline-pod-2
jobmanager.rpc.address: 196.10.20.20
query.server.port: 6125
taskmanager.host: 196.10.20.30
taskmanager.memory.process.size: 9216m
taskmanager.numberOfTaskSlots: 1

Checking the working directory (/data/flink) I can see the following files:
ls -l
total 8
drwxr-sr-x 21 flink daemon 4096 Apr 4 21:16 data
drwxrwsr-x 2 flink daemon 4096 Feb 24 09:07 localState
drwxr-sr-x 6 flink daemon 4096 Apr 4 21:15 tm_flink-pipeline-pod-2

 When a pod is restarted, the same persistent volume is attached to the
newly created pod, so I am expecting that local recovery is triggered in
this case, but for some reason it is always the remote recovery handler
that is executed (based on the logs). I could also verify this behaviour
by monitoring the disk usage on the TM pods: the pod disk usage goes near 0
on the flink pipeline startup.

Initially I thought that increasing slot.idle.timeout could help, but even
setting this to a longer duration did not help.

I appreciate the help on this and can provide further details if needed.

Regards,
Abdallah.