You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aian Cantabrana <ac...@zylk.net> on 2021/09/09 12:01:27 UTC
Flink Checkpoints stored in TaskManager
Hi,
I am running a flink job which uses flink's state and i have set RocksDB as StateBackend. I also enabled checkpointing.
This is my flink-conf.yaml:
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.size: 4096m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 32
parallelism.default: 1
state.backend: rocksdb
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.checkpoints.dir: file:///opt/flink/flink-checkpoints
state.backend.incremental: false
execution.checkpointing.interval: 1800000
state.checkpoints.num-retained: 1
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
The problem is that checkpoints are not only being stored in the JobManager but also in TaskManagers. While in the JobManager only the last checkpoint is kept, in TaskManagers all of them are being stored filling up disc space.
JobManager:
TaskManager:
Everything seems to be working correctly but this checkpoint accumulation causes the need to restart the job once a month in order to clean all these old checkpoints.
I am running the flink cluster in docker with just one TaskManager. I guess that I miss a configuration parameter but I have been reading flink's documentation and did not find the issue.
I also tryed enabling incremental checkpointing but I experienced same behaviour.
Thank you in advance for any help,
Aian
--
-----------------------------------------
Aian Cantabrana
ZYLK.net :: consultorÃa.openSource
Ribera de Axpe, 11
Edificio A, modulo 201-203
48950 Erandio (Bizkaia)
telf.: +34 747421343
ofic.: +34 944272119
-----------------------------------------
Re: Flink Checkpoints stored in TaskManager
Posted by Robert Metzger <rm...@apache.org>.
Hey,
you are configuring state.checkpoints.dir to a local directory. This needs
to be a filesystem that is accessible by all TaskManager instances, even if
a TaskManager instance fails.
People typically use S3 (or minio), HDFS, etc. for that.
This is basically the directory where Flink periodically writes a "backup"
(hence the name checkpoint) of all its current state. When a TaskManager or
a machine fails, Flink can restore the state from there.
On Thu, Sep 9, 2021 at 2:02 PM Aian Cantabrana <ac...@zylk.net> wrote:
> Hi,
>
> I am running a flink job which uses flink's state and i have set RocksDB
> as StateBackend. I also enabled checkpointing.
>
> This is my flink-conf.yaml:
>
> jobmanager.rpc.address: jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 4096m
> taskmanager.memory.process.size: 4096m
> taskmanager.numberOfTaskSlots: 32
> parallelism.default: 1
> state.backend: rocksdb
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.checkpoints.dir: file:///opt/flink/flink-checkpoints
> state.backend.incremental: false
> execution.checkpointing.interval: 1800000
> state.checkpoints.num-retained: 1
> jobmanager.execution.failover-strategy: region
> blob.server.port: 6124
> query.server.port: 6125
>
> The problem is that checkpoints are not only being stored in the
> JobManager but also in TaskManagers. While in the JobManager only the last
> checkpoint is kept, in TaskManagers all of them are being stored filling up
> disc space.
>
> JobManager:
>
> TaskManager:
>
> Everything seems to be working correctly but this checkpoint accumulation
> causes the need to restart the job once a month in order to clean all these
> old checkpoints.
>
>
> I am running the flink cluster in docker with just one TaskManager. I
> guess that I miss a configuration parameter but I have been reading flink's
> documentation and did not find the issue.
>
> I also tryed enabling incremental checkpointing but I experienced same
> behaviour.
>
> Thank you in advance for any help,
>
> Aian
>
> --
> -----------------------------------------
> Aian Cantabrana
>
> ZYLK.net :: consultorÃa.openSource
> Ribera de Axpe, 11
> Edificio A, modulo 201-203
> 48950 Erandio (Bizkaia)
>
> telf.: +34 747421343
> ofic.: +34 944272119
> -----------------------------------------
>