You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Francesco Leone <le...@gmail.com> on 2023/05/02 11:58:08 UTC

The JobManager is taking minutes to complete and finalize checkpoints despite the Task Managers seem to complete them in a few seconds

Hi There,

We are facing a problem with the flink checkpoints in our flink cluster
made of 38 task managers and 2 job managers (HA).  We are running Flink
1.15.2 on OpenJdk 11 and the checkpoints are stored in AWS S3 with presto.

The overall checkpoint duration is reported as 8 seconds in the Flink UI
(screenshot attached), but the job manager is reporting a duration of ~328
seconds in its logs and the checkpoints are taken roughly every 6 minutes
instead of the configured interval of 1 minute.

JobManager logs are reporting
-_message__:__"Completed checkpoint 2515895 for job
fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes,
checkpointDuration=150047 ms, finalizationTime=178119 ms).",_
_logger_name__:__"o.a.f.r.c.CheckpointCoordinator"

---------------------------------------------------------------
The checkpoint configuration is

CheckpointConfig config = ...
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMaxConcurrentCheckpoints(1);

checkArgument(maxFailures >= 0);
config.setTolerableCheckpointFailureNumber(maxFailures);

config.setCheckpointInterval(1000 * 60);  //1 minute
config.setCheckpointTimeout(15 * 1000 * 60); //15 minutes
config.setMinPauseBetweenCheckpoints(5000);

In the flink-conf.yaml
-----------------------------------------------------------------------------------
presto.s3.ssl.enabled: true
presto.s3.sse.enabled: true
presto.s3.sse.type: S3
fs.s3a.buffer.dir: /mnt/data/tmp

s3.entropy.key: HASH
s3.entropy.length: 4

state.backend: rocksdb
state.checkpoints.dir: s3p://xxxxxxxxxxxxxxxxxxxx/checkpoints/HASH/

state.storage.fs.memory-threshold: 1048576
state.storage.fs.write-buffer-size: 4194304
state.savepoints.dir: s3p://xxxxxxxxxxxxxxxxxxxxxx/savepoints/xxxxxxxx

state.checkpoints.num-retained: 4
state.backend.local-recovery: false
state.backend.incremental: true
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.options-factory: custom.RocksDbOptionsFactory
state.backend.rocksdb.compression: ZSTD_COMPRESSION

execution.checkpointing.unaligned: true
 taskmanager.state.local.root-dirs: /mnt/data/tmp/local
-------------------------------------------------------------------------------------

Thanks
Kind Regards

Francesco

Re: The JobManager is taking minutes to complete and finalize checkpoints despite the Task Managers seem to complete them in a few seconds

Posted by Yanfei Lei <fr...@gmail.com>.
Hi Francesco,

The overall checkpoint duration in Flink UI is EndToEndDuration[1],
which is the time from Jobmanager triggering checkpoint to collecting
the last ack message sent from task manager, depending on the slowest
task manager.

> "-_message__:__"Completed checkpoint 2515895 for job fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes, checkpointDuration=150047 ms, finalizationTime=178119 ms).",_
_logger_name__:__"o.a.f.r.c.CheckpointCoordinator"

The "checkpointDuration" in this log includes the time from pending
Checkpoint to completed Checkpoint[2], and "finalizationTime" includes
the removal of older checkpoints.

> "The checkpoints are taken roughly every 6 minutes instead of the configured interval of 1 minute."

When the concurrent checkpoint is set to 1, the next checkpoint will
be triggered only when the previous checkpoint is completed.

[1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java#L188
[2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1378

David Morávek <dm...@apache.org> 于2023年5月2日周二 20:14写道:
>
> Hi Francesco,
>
> Finalization also includes the removal of older checkpoints (we're only keeping the last N checkpoints), which could be pretty costly in the case of RocksDB (many small files). Can you check how long the removal of old checkpoint files from S3 takes (there might be some rate limiting involved, for example)?
>
> Best,
> D.
>
> On Tue, May 2, 2023 at 1:59 PM Francesco Leone <le...@gmail.com> wrote:
>>
>> Hi There,
>>
>> We are facing a problem with the flink checkpoints in our flink cluster made of 38 task managers and 2 job managers (HA).  We are running Flink 1.15.2 on OpenJdk 11 and the checkpoints are stored in AWS S3 with presto.
>>
>> The overall checkpoint duration is reported as 8 seconds in the Flink UI (screenshot attached), but the job manager is reporting a duration of ~328 seconds in its logs and the checkpoints are taken roughly every 6 minutes instead of the configured interval of 1 minute.
>>
>> JobManager logs are reporting
>> -_message__:__"Completed checkpoint 2515895 for job fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes, checkpointDuration=150047 ms, finalizationTime=178119 ms).",_
>> _logger_name__:__"o.a.f.r.c.CheckpointCoordinator"
>>
>> ---------------------------------------------------------------
>> The checkpoint configuration is
>>
>> CheckpointConfig config = ...
>> config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> config.setMaxConcurrentCheckpoints(1);
>>
>> checkArgument(maxFailures >= 0);
>> config.setTolerableCheckpointFailureNumber(maxFailures);
>>
>> config.setCheckpointInterval(1000 * 60);  //1 minute
>> config.setCheckpointTimeout(15 * 1000 * 60); //15 minutes
>> config.setMinPauseBetweenCheckpoints(5000);
>>
>> In the flink-conf.yaml
>> -----------------------------------------------------------------------------------
>> presto.s3.ssl.enabled: true
>> presto.s3.sse.enabled: true
>> presto.s3.sse.type: S3
>> fs.s3a.buffer.dir: /mnt/data/tmp
>>
>> s3.entropy.key: HASH
>> s3.entropy.length: 4
>>
>> state.backend: rocksdb
>> state.checkpoints.dir: s3p://xxxxxxxxxxxxxxxxxxxx/checkpoints/HASH/
>>
>> state.storage.fs.memory-threshold: 1048576
>> state.storage.fs.write-buffer-size: 4194304
>> state.savepoints.dir: s3p://xxxxxxxxxxxxxxxxxxxxxx/savepoints/xxxxxxxx
>>
>> state.checkpoints.num-retained: 4
>> state.backend.local-recovery: false
>> state.backend.incremental: true
>> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
>> state.backend.rocksdb.options-factory: custom.RocksDbOptionsFactory
>> state.backend.rocksdb.compression: ZSTD_COMPRESSION
>>
>> execution.checkpointing.unaligned: true
>>  taskmanager.state.local.root-dirs: /mnt/data/tmp/local
>> -------------------------------------------------------------------------------------
>>
>> Thanks
>> Kind Regards
>>
>> Francesco



-- 
Best,
Yanfei

Re: The JobManager is taking minutes to complete and finalize checkpoints despite the Task Managers seem to complete them in a few seconds

Posted by David Morávek <dm...@apache.org>.
Hi Francesco,

Finalization also includes the removal of older checkpoints (we're only
keeping the last N checkpoints), which could be pretty costly in the case
of RocksDB (many small files). Can you check how long the removal of old
checkpoint files from S3 takes (there might be some rate limiting involved,
for example)?

Best,
D.

On Tue, May 2, 2023 at 1:59 PM Francesco Leone <le...@gmail.com> wrote:

> Hi There,
>
> We are facing a problem with the flink checkpoints in our flink cluster
> made of 38 task managers and 2 job managers (HA).  We are running Flink
> 1.15.2 on OpenJdk 11 and the checkpoints are stored in AWS S3 with presto.
>
> The overall checkpoint duration is reported as 8 seconds in the Flink UI
> (screenshot attached), but the job manager is reporting a duration of ~328
> seconds in its logs and the checkpoints are taken roughly every 6 minutes
> instead of the configured interval of 1 minute.
>
> JobManager logs are reporting
> -_message__:__"Completed checkpoint 2515895 for job
> fdc83bb7b697b8eab84238cd588805ef (7242148389 bytes,
> checkpointDuration=150047 ms, finalizationTime=178119 ms).",_
> _logger_name__:__"o.a.f.r.c.CheckpointCoordinator"
>
> ---------------------------------------------------------------
> The checkpoint configuration is
>
> CheckpointConfig config = ...
> config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> config.setMaxConcurrentCheckpoints(1);
>
> checkArgument(maxFailures >= 0);
> config.setTolerableCheckpointFailureNumber(maxFailures);
>
> config.setCheckpointInterval(1000 * 60);  //1 minute
> config.setCheckpointTimeout(15 * 1000 * 60); //15 minutes
> config.setMinPauseBetweenCheckpoints(5000);
>
> In the flink-conf.yaml
>
> -----------------------------------------------------------------------------------
> presto.s3.ssl.enabled: true
> presto.s3.sse.enabled: true
> presto.s3.sse.type: S3
> fs.s3a.buffer.dir: /mnt/data/tmp
>
> s3.entropy.key: HASH
> s3.entropy.length: 4
>
> state.backend: rocksdb
> state.checkpoints.dir: s3p://xxxxxxxxxxxxxxxxxxxx/checkpoints/HASH/
>
> state.storage.fs.memory-threshold: 1048576
> state.storage.fs.write-buffer-size: 4194304
> state.savepoints.dir: s3p://xxxxxxxxxxxxxxxxxxxxxx/savepoints/xxxxxxxx
>
> state.checkpoints.num-retained: 4
> state.backend.local-recovery: false
> state.backend.incremental: true
> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
>
> state.backend.rocksdb.options-factory: custom.RocksDbOptionsFactory
> state.backend.rocksdb.compression: ZSTD_COMPRESSION
>
> execution.checkpointing.unaligned: true
>  taskmanager.state.local.root-dirs: /mnt/data/tmp/local
>
> -------------------------------------------------------------------------------------
>
> Thanks
> Kind Regards
>
> Francesco
>