You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Trystan <en...@gmail.com> on 2020/05/06 18:46:23 UTC

Shared Checkpoint Cleanup and S3 Lifecycle Policy

Hello!

Recently we ran into an issue when checkpointing to S3. Because S3
ratelimits based on prefix, the /shared directory would get slammed and
cause S3 throttling. There is no solution for this, because
/job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.

(source:
https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html)

Jobs sometimes also completely crash, and they leave state laying around
when we bring the job up fresh.

Our solutions have been to 1) reduce the number of taskmanagers 2) reduce
the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had
increased it to speed up checkpointing/savepoint) and 3) manually delete
tons of old files in the shared directory.

My question:
Can we safely apply a Lifecycle Policy to the directory/bucket to remove
things? How long is stuff under /shared retained? Is it only for the
duration of the oldest checkpoint, or could it carry forward, untouched,
from the very first checkpoint to the very last? This shared checkpoint
dir/prefix is currently limiting some scalability of our jobs. I don't
believe the _entropy_ trick would help this, because the issue is
ultimately that there's a single shared directory.

Thank you!
Trystan

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

Currently, it is hard to determine which files can be deleted safely in the
shared folder, the ground truth is in the checkpoint metafile. I've created
an issue[1] for such a feature

[1] https://issues.apache.org/jira/browse/FLINK-17571
Best,
Congxian


Trystan <en...@gmail.com> 于2020年5月8日周五 下午1:05写道:

> Aha, so incremental checkpointing *does* rely on infinitely-previous
> checkpoint state, regardless of the incremental retention number. The
> documentation wasn't entirely clear about this. One would assume that if
> you retain 3 checkpoints, anything older than the 3rd is irrelevant, but
> that's evidently not true. So it is never safe to delete any files in
> /shared, because we can't know which files belong to the current job (and
> may have lived on from checkpoint 1 even though we're on checkpoint 10 and
> only "retain" 3) and which ones have been abandoned altogether (due to a
> previous run of the job where we didn't restore state).
>
> This is really unfortunate - it can lead to a case where you accumulate a
> huge number of files in S3 and you can't know when ones to delete,
> especially if the job id remains the same (for job mode, they're all
> zeros). So this shared state lives on forever and there is no way to ever
> clean it up, at all. I am surprised that this hasn't been a problem for
> anyone else. Maybe I should just file a feature request for this, at least
> to find some solution for ways to clean up these directories.
>
> I appreciate your patience and help, thank you so much!
>
> Trystan
>
> On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi
>>
>> Yes, there should only files used in checkpoint 8 and 9 and 10 in the
>> checkpoint file, but you can not delete the file which created older than 3
>> minutes(because checkpoint 8,9, 10 may reuse the file created in the
>> previous checkpoint, this is the how incremental checkpoint works[1])
>>
>> you can also check the directory of checkpoint files[2] for more
>> information, copied from the website here:
>> > The SHARED directory is for state that is possibly part of multiple
>> checkpoints, TASKOWNED is for state that must never be dropped by the
>> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint
>> only.
>>
>> For the entropy injection, you can enable it as the documentation said,
>> it will replace the entropy_key with some random strings with the
>> specified length so that the files are not all in the same directory.
>>
>> [1]
>> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>> Best,
>> Congxian
>>
>>
>> Trystan <en...@gmail.com> 于2020年5月7日周四 下午12:54写道:
>>
>>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
>>> incremental checkpoints (say every minute), and I've just completed
>>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
>>> anything older than ~3 minutes can safely be deleted? The state from
>>> checkpoint 5 doesn't live on in the shared directory - at all?
>>>
>>> I ask because we have run into cases where we end up abandoning the
>>> state, and Flink does not clean up state from, say, a previous iteration of
>>> the job if you don't restore state. We need to remove these files
>>> automatically, but I want to be sure that I don't blow away older files in
>>> the shared dir from earlier, subsumed checkpoints - but you are saying that
>>> isn't possible, and that all subsumed checkpoints will have their /shared
>>> state rewritten or cleaned up as needed, correct?
>>>
>>> As for entropy, where would you suggest to use it? My understanding is
>>> that I don't control anything beyond the checkpoint directory, and since
>>> shared is in that directory I can't put entropy inside the shared directory
>>> itself (which is what I would need).
>>>
>>> Thanks,
>>> Trystan
>>>
>>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qc...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>> For the rate limit, could you please try entropy injection[1].
>>>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>>>> file if it will never be used in the future). The file in the checkpoint
>>>> will be there if the corresponding checkpoint is still valid.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Trystan <en...@gmail.com> 于2020年5月7日周四 上午2:46写道:
>>>>
>>>>> Hello!
>>>>>
>>>>> Recently we ran into an issue when checkpointing to S3. Because S3
>>>>> ratelimits based on prefix, the /shared directory would get slammed and
>>>>> cause S3 throttling. There is no solution for this, because
>>>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
>>>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>>>>
>>>>> (source:
>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>>>>> )
>>>>>
>>>>> Jobs sometimes also completely crash, and they leave state laying
>>>>> around when we bring the job up fresh.
>>>>>
>>>>> Our solutions have been to 1) reduce the number of taskmanagers 2)
>>>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
>>>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually
>>>>> delete tons of old files in the shared directory.
>>>>>
>>>>> My question:
>>>>> Can we safely apply a Lifecycle Policy to the directory/bucket to
>>>>> remove things? How long is stuff under /shared retained? Is it only for the
>>>>> duration of the oldest checkpoint, or could it carry forward, untouched,
>>>>> from the very first checkpoint to the very last? This shared checkpoint
>>>>> dir/prefix is currently limiting some scalability of our jobs. I don't
>>>>> believe the _entropy_ trick would help this, because the issue is
>>>>> ultimately that there's a single shared directory.
>>>>>
>>>>> Thank you!
>>>>> Trystan
>>>>>
>>>>

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Posted by Trystan <en...@gmail.com>.
Aha, so incremental checkpointing *does* rely on infinitely-previous
checkpoint state, regardless of the incremental retention number. The
documentation wasn't entirely clear about this. One would assume that if
you retain 3 checkpoints, anything older than the 3rd is irrelevant, but
that's evidently not true. So it is never safe to delete any files in
/shared, because we can't know which files belong to the current job (and
may have lived on from checkpoint 1 even though we're on checkpoint 10 and
only "retain" 3) and which ones have been abandoned altogether (due to a
previous run of the job where we didn't restore state).

This is really unfortunate - it can lead to a case where you accumulate a
huge number of files in S3 and you can't know when ones to delete,
especially if the job id remains the same (for job mode, they're all
zeros). So this shared state lives on forever and there is no way to ever
clean it up, at all. I am surprised that this hasn't been a problem for
anyone else. Maybe I should just file a feature request for this, at least
to find some solution for ways to clean up these directories.

I appreciate your patience and help, thank you so much!

Trystan

On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <qc...@gmail.com> wrote:

> Hi
>
> Yes, there should only files used in checkpoint 8 and 9 and 10 in the
> checkpoint file, but you can not delete the file which created older than 3
> minutes(because checkpoint 8,9, 10 may reuse the file created in the
> previous checkpoint, this is the how incremental checkpoint works[1])
>
> you can also check the directory of checkpoint files[2] for more
> information, copied from the website here:
> > The SHARED directory is for state that is possibly part of multiple
> checkpoints, TASKOWNED is for state that must never be dropped by the
> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint
> only.
>
> For the entropy injection, you can enable it as the documentation said, it
> will replace the entropy_key with some random strings with the
> specified length so that the files are not all in the same directory.
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
> Best,
> Congxian
>
>
> Trystan <en...@gmail.com> 于2020年5月7日周四 下午12:54写道:
>
>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
>> incremental checkpoints (say every minute), and I've just completed
>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
>> anything older than ~3 minutes can safely be deleted? The state from
>> checkpoint 5 doesn't live on in the shared directory - at all?
>>
>> I ask because we have run into cases where we end up abandoning the
>> state, and Flink does not clean up state from, say, a previous iteration of
>> the job if you don't restore state. We need to remove these files
>> automatically, but I want to be sure that I don't blow away older files in
>> the shared dir from earlier, subsumed checkpoints - but you are saying that
>> isn't possible, and that all subsumed checkpoints will have their /shared
>> state rewritten or cleaned up as needed, correct?
>>
>> As for entropy, where would you suggest to use it? My understanding is
>> that I don't control anything beyond the checkpoint directory, and since
>> shared is in that directory I can't put entropy inside the shared directory
>> itself (which is what I would need).
>>
>> Thanks,
>> Trystan
>>
>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qc...@gmail.com>
>> wrote:
>>
>>> Hi
>>> For the rate limit, could you please try entropy injection[1].
>>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>>> file if it will never be used in the future). The file in the checkpoint
>>> will be there if the corresponding checkpoint is still valid.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>>> Best,
>>> Congxian
>>>
>>>
>>> Trystan <en...@gmail.com> 于2020年5月7日周四 上午2:46写道:
>>>
>>>> Hello!
>>>>
>>>> Recently we ran into an issue when checkpointing to S3. Because S3
>>>> ratelimits based on prefix, the /shared directory would get slammed and
>>>> cause S3 throttling. There is no solution for this, because
>>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
>>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>>>
>>>> (source:
>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>>>> )
>>>>
>>>> Jobs sometimes also completely crash, and they leave state laying
>>>> around when we bring the job up fresh.
>>>>
>>>> Our solutions have been to 1) reduce the number of taskmanagers 2)
>>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
>>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually
>>>> delete tons of old files in the shared directory.
>>>>
>>>> My question:
>>>> Can we safely apply a Lifecycle Policy to the directory/bucket to
>>>> remove things? How long is stuff under /shared retained? Is it only for the
>>>> duration of the oldest checkpoint, or could it carry forward, untouched,
>>>> from the very first checkpoint to the very last? This shared checkpoint
>>>> dir/prefix is currently limiting some scalability of our jobs. I don't
>>>> believe the _entropy_ trick would help this, because the issue is
>>>> ultimately that there's a single shared directory.
>>>>
>>>> Thank you!
>>>> Trystan
>>>>
>>>

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

Yes, there should only files used in checkpoint 8 and 9 and 10 in the
checkpoint file, but you can not delete the file which created older than 3
minutes(because checkpoint 8,9, 10 may reuse the file created in the
previous checkpoint, this is the how incremental checkpoint works[1])

you can also check the directory of checkpoint files[2] for more
information, copied from the website here:
> The SHARED directory is for state that is possibly part of multiple
checkpoints, TASKOWNED is for state that must never be dropped by the
JobManager, and EXCLUSIVE is for state that belongs to one checkpoint only.

For the entropy injection, you can enable it as the documentation said, it
will replace the entropy_key with some random strings with the
specified length so that the files are not all in the same directory.

[1]
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
Best,
Congxian


Trystan <en...@gmail.com> 于2020年5月7日周四 下午12:54写道:

> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
> incremental checkpoints (say every minute), and I've just completed
> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
> anything older than ~3 minutes can safely be deleted? The state from
> checkpoint 5 doesn't live on in the shared directory - at all?
>
> I ask because we have run into cases where we end up abandoning the state,
> and Flink does not clean up state from, say, a previous iteration of the
> job if you don't restore state. We need to remove these files
> automatically, but I want to be sure that I don't blow away older files in
> the shared dir from earlier, subsumed checkpoints - but you are saying that
> isn't possible, and that all subsumed checkpoints will have their /shared
> state rewritten or cleaned up as needed, correct?
>
> As for entropy, where would you suggest to use it? My understanding is
> that I don't control anything beyond the checkpoint directory, and since
> shared is in that directory I can't put entropy inside the shared directory
> itself (which is what I would need).
>
> Thanks,
> Trystan
>
> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi
>> For the rate limit, could you please try entropy injection[1].
>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>> file if it will never be used in the future). The file in the checkpoint
>> will be there if the corresponding checkpoint is still valid.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>> Best,
>> Congxian
>>
>>
>> Trystan <en...@gmail.com> 于2020年5月7日周四 上午2:46写道:
>>
>>> Hello!
>>>
>>> Recently we ran into an issue when checkpointing to S3. Because S3
>>> ratelimits based on prefix, the /shared directory would get slammed and
>>> cause S3 throttling. There is no solution for this, because
>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>>
>>> (source:
>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>>> )
>>>
>>> Jobs sometimes also completely crash, and they leave state laying around
>>> when we bring the job up fresh.
>>>
>>> Our solutions have been to 1) reduce the number of taskmanagers 2)
>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually
>>> delete tons of old files in the shared directory.
>>>
>>> My question:
>>> Can we safely apply a Lifecycle Policy to the directory/bucket to remove
>>> things? How long is stuff under /shared retained? Is it only for the
>>> duration of the oldest checkpoint, or could it carry forward, untouched,
>>> from the very first checkpoint to the very last? This shared checkpoint
>>> dir/prefix is currently limiting some scalability of our jobs. I don't
>>> believe the _entropy_ trick would help this, because the issue is
>>> ultimately that there's a single shared directory.
>>>
>>> Thank you!
>>> Trystan
>>>
>>

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Posted by Trystan <en...@gmail.com>.
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
incremental checkpoints (say every minute), and I've just completed
checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
anything older than ~3 minutes can safely be deleted? The state from
checkpoint 5 doesn't live on in the shared directory - at all?

I ask because we have run into cases where we end up abandoning the state,
and Flink does not clean up state from, say, a previous iteration of the
job if you don't restore state. We need to remove these files
automatically, but I want to be sure that I don't blow away older files in
the shared dir from earlier, subsumed checkpoints - but you are saying that
isn't possible, and that all subsumed checkpoints will have their /shared
state rewritten or cleaned up as needed, correct?

As for entropy, where would you suggest to use it? My understanding is that
I don't control anything beyond the checkpoint directory, and since shared
is in that directory I can't put entropy inside the shared directory itself
(which is what I would need).

Thanks,
Trystan

On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qc...@gmail.com> wrote:

> Hi
> For the rate limit, could you please try entropy injection[1].
> For checkpoint, Flink will handle the file lifecycle(it will delete the
> file if it will never be used in the future). The file in the checkpoint
> will be there if the corresponding checkpoint is still valid.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
> Best,
> Congxian
>
>
> Trystan <en...@gmail.com> 于2020年5月7日周四 上午2:46写道:
>
>> Hello!
>>
>> Recently we ran into an issue when checkpointing to S3. Because S3
>> ratelimits based on prefix, the /shared directory would get slammed and
>> cause S3 throttling. There is no solution for this, because
>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>
>> (source:
>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>> )
>>
>> Jobs sometimes also completely crash, and they leave state laying around
>> when we bring the job up fresh.
>>
>> Our solutions have been to 1) reduce the number of taskmanagers 2) reduce
>> the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had
>> increased it to speed up checkpointing/savepoint) and 3) manually delete
>> tons of old files in the shared directory.
>>
>> My question:
>> Can we safely apply a Lifecycle Policy to the directory/bucket to remove
>> things? How long is stuff under /shared retained? Is it only for the
>> duration of the oldest checkpoint, or could it carry forward, untouched,
>> from the very first checkpoint to the very last? This shared checkpoint
>> dir/prefix is currently limiting some scalability of our jobs. I don't
>> believe the _entropy_ trick would help this, because the issue is
>> ultimately that there's a single shared directory.
>>
>> Thank you!
>> Trystan
>>
>

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

Posted by Congxian Qiu <qc...@gmail.com>.
Hi
For the rate limit, could you please try entropy injection[1].
For checkpoint, Flink will handle the file lifecycle(it will delete the
file if it will never be used in the future). The file in the checkpoint
will be there if the corresponding checkpoint is still valid.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
Best,
Congxian


Trystan <en...@gmail.com> 于2020年5月7日周四 上午2:46写道:

> Hello!
>
> Recently we ran into an issue when checkpointing to S3. Because S3
> ratelimits based on prefix, the /shared directory would get slammed and
> cause S3 throttling. There is no solution for this, because
> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500
> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>
> (source:
> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
> )
>
> Jobs sometimes also completely crash, and they leave state laying around
> when we bring the job up fresh.
>
> Our solutions have been to 1) reduce the number of taskmanagers 2) reduce
> the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 (we had
> increased it to speed up checkpointing/savepoint) and 3) manually delete
> tons of old files in the shared directory.
>
> My question:
> Can we safely apply a Lifecycle Policy to the directory/bucket to remove
> things? How long is stuff under /shared retained? Is it only for the
> duration of the oldest checkpoint, or could it carry forward, untouched,
> from the very first checkpoint to the very last? This shared checkpoint
> dir/prefix is currently limiting some scalability of our jobs. I don't
> believe the _entropy_ trick would help this, because the issue is
> ultimately that there's a single shared directory.
>
> Thank you!
> Trystan
>