You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by prashantnayak <pr...@intellifylearning.com> on 2017/07/13 20:43:42 UTC

Re: S3 recovery and checkpoint directories exhibit explosive growth

To add one more data point... it seems like the recovery directory is the
bottleneck somehow..  so if we delete the recovery directory and restart the
job manager - it comes back and is responsive.

Of course, we lose all jobs, since none can be recovered... and that is of
course not ideal.

So the question seems to be why the recovery directory grows exponentially
in the first place.

I can't imagine we're the only ones to see this... or we must be configuring
something wrong while testing Flink 1.3.1

Thanks for your help in advance

Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14271.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Stephan,
    Making Flink's S3 integration independent of Hadoop is great. We've
been running into a lot of Hadoop configuration trouble when trying to
enabling Flink checkpointing with S3 on AWS EMR.

    Is there any concrete plan or tickets created yet for tracking?

Thanks,
Bowen


On Mon, Jul 24, 2017 at 11:12 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Prashant!
>
> Flink's S3 integration currently goes through Hadoop's S3 file system (as
> you probably noticed).
>
> It seems that the Hadoop's S3 file system is not really well suited for
> what we want to do, and we are looking to drop it and replace it by
> something direct (independent of Hadoop) in the coming release...
>
> One essential thing to make sure is to not have the "trash" activated in
> the configuration, as it adds very high overhead to the delete operations.
>
> Best,
> Stephan
>
>
> On Mon, Jul 24, 2017 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Prashant!
>>
>> I assume you are using Flink 1.3.0 or 1.3.1?
>>
>> Here are some things you can do:
>>
>>   - I would try and disable the incremental checkpointing for a start
>> and see what happens then. That should reduce the number of files already.
>>
>>   - Is it possible for you to run a patched version of Flink? If yes, can
>> you try to do the following: In the class "FileStateHandle", in the method
>> "discardState()", remove the code around "FileUtils.deletePathIfEmpty(...)"
>> - this is probably not working well when hitting too many S3 files.
>>
>>   -  You can delete old "completedCheckpointXXXYYY" files, but please do
>> not delete the other two types, they are needed for HA recovery.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Jul 24, 2017 at 3:46 AM, prashantnayak <
>> prashant@intellifylearning.com> wrote:
>>
>>> Hi Xiaogang and Stephan
>>>
>>> We're continuing to test and have now set up the cluster to disable
>>> incremental RocksDB checkpointing as well as increasing the checkpoint
>>> interval from 30s to 120s  (not ideal really :-( )
>>>
>>> We'll run it with a large number of jobs and report back if this setup
>>> shows
>>> improvement.
>>>
>>> Appreciate any another insights you might have around this problem.
>>>
>>> Thanks
>>> Prashant
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkp
>>> oint-directories-exhibit-explosive-growth-tp14270p14392.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Thanks Stefan.

+1 on "I am even considering packing this list as a plain text file with the
checkpoint, to make this more transparent for users"

that is def. more Ops friendly...

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14479.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

your concerns about deleting files when using incremental checkpoints is very valid. Deleting empty checkpoint folders is obviously ok. As for files, I have recently added some additional logging to the checkpointing mechanism to report the files referenced in the last checkpoint. I will try to also include the logging in 1.3.2.  Based on this, you could make safe assumptions about which files are actually orphaned. I am even considering packing this list as a plain text file with the checkpoint, to make this more transparent for users.

Best,
Stefan

> Am 26.07.2017 um 16:57 schrieb prashantnayak <pr...@intellifylearning.com>:
> 
> Thanks Stephan and Stefan
> 
> We're looking forward to this patch in 1.3.2
> 
> We will use a patched version depending upon when 1.3.2 is going to be
> available.
> 
> We're also implementing a cron job to remove orphaned/older
> completedCheckpoint files per your recommendations..  one caveat with a job
> like that is that we have to check if a particular job is
> stopped/paused/down and also if the Job Manager is down so we don't
> accidentally remove valid checkpoint files..   this makes it a bit dicey....
> ideal of course is not to have to do this. 
> 
> The move away from hadoop/s3 would be welcome as well.
> 
> Flink job state is critical to us since we have very long running jobs
> (months) processing hundreds of millions of records.  
> 
> Thanks
> Prashant
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14477.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Thanks Stephan and Stefan

We're looking forward to this patch in 1.3.2

We will use a patched version depending upon when 1.3.2 is going to be
available.

We're also implementing a cron job to remove orphaned/older
completedCheckpoint files per your recommendations..  one caveat with a job
like that is that we have to check if a particular job is
stopped/paused/down and also if the Job Manager is down so we don't
accidentally remove valid checkpoint files..   this makes it a bit dicey....
ideal of course is not to have to do this. 

The move away from hadoop/s3 would be welcome as well.

Flink job state is critical to us since we have very long running jobs
(months) processing hundreds of millions of records.  

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stephan Ewen <se...@apache.org>.
Stefan is correct in pointing out the lines to remove.

FYI: We are trying to add this patch to the upcoming 1.3.2 release which
should also help: https://github.com/apache/flink/pull/4397

On Wed, Jul 26, 2017 at 9:48 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> I think Stephan was talking about removing this part:
>
>         try {
>                 FileUtils.deletePathIfEmpty(fs, filePath.getParent());
>         } catch (Exception ignored) {}
>
>
> This part should *NOT* be removed:
>
>         fs.delete(filePath, false);
>
> The reason is that the first is only an additional cleanup that, for each
> deleted file, checks if the containing directory is now empty and then
> removes the directory. Checking for empty directories includes listing all
> files in the directory, which is expensive in S3 and the only downside of
> removing the line are orphaned empty checkpoint directories, which could be
> cleaned by a script.
>
> Delete of the file itself must remain because that is how we release files
> from old checkpoints.
>
> Best,
> Stefan
>
> > Am 26.07.2017 um 04:31 schrieb prashantnayak <
> prashant@intellifylearning.com>:
> >
> > Hi Stephan
> >
> > Unclear on what you mean by the "trash" option... thought that was only
> > available for command line hadoop and not applicable for API, which is
> what
> > Flink uses?  If there is a configuration for the Flink/Hadoop connector,
> > please let me know.
> >
> > Also, one additional thing about S3.... S3 supports this option of
> > "versioned" buckets... Since versioning basically results in a delete on
> a
> > object not actually deleting it (unless there is a bucket lifecycle
> > policy)... I think you should recommend that Flink users that rely on S3
> > turn off bucket versioning since it seems to not really be a factor for
> > Flink...
> >
> > Thanks
> > Prashant
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-
> checkpoint-directories-exhibit-explosive-growth-tp14270p14453.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think Stephan was talking about removing this part:

	try {
		FileUtils.deletePathIfEmpty(fs, filePath.getParent());
	} catch (Exception ignored) {}


This part should *NOT* be removed:

	fs.delete(filePath, false);

The reason is that the first is only an additional cleanup that, for each deleted file, checks if the containing directory is now empty and then removes the directory. Checking for empty directories includes listing all files in the directory, which is expensive in S3 and the only downside of removing the line are orphaned empty checkpoint directories, which could be cleaned by a script.

Delete of the file itself must remain because that is how we release files from old checkpoints.

Best,
Stefan

> Am 26.07.2017 um 04:31 schrieb prashantnayak <pr...@intellifylearning.com>:
> 
> Hi Stephan
> 
> Unclear on what you mean by the "trash" option... thought that was only
> available for command line hadoop and not applicable for API, which is what
> Flink uses?  If there is a configuration for the Flink/Hadoop connector,
> please let me know.
> 
> Also, one additional thing about S3.... S3 supports this option of
> "versioned" buckets... Since versioning basically results in a delete on a
> object not actually deleting it (unless there is a bucket lifecycle
> policy)... I think you should recommend that Flink users that rely on S3
> turn off bucket versioning since it seems to not really be a factor for
> Flink...
> 
> Thanks
> Prashant
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14453.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Hi Stephan

Unclear on what you mean by the "trash" option... thought that was only
available for command line hadoop and not applicable for API, which is what
Flink uses?  If there is a configuration for the Flink/Hadoop connector,
please let me know.

Also, one additional thing about S3.... S3 supports this option of
"versioned" buckets... Since versioning basically results in a delete on a
object not actually deleting it (unless there is a bucket lifecycle
policy)... I think you should recommend that Flink users that rely on S3
turn off bucket versioning since it seems to not really be a factor for
Flink...

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14453.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stephan Ewen <se...@apache.org>.
Hi Prashant!

Flink's S3 integration currently goes through Hadoop's S3 file system (as
you probably noticed).

It seems that the Hadoop's S3 file system is not really well suited for
what we want to do, and we are looking to drop it and replace it by
something direct (independent of Hadoop) in the coming release...

One essential thing to make sure is to not have the "trash" activated in
the configuration, as it adds very high overhead to the delete operations.

Best,
Stephan


On Mon, Jul 24, 2017 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Prashant!
>
> I assume you are using Flink 1.3.0 or 1.3.1?
>
> Here are some things you can do:
>
>   - I would try and disable the incremental checkpointing for a start and
> see what happens then. That should reduce the number of files already.
>
>   - Is it possible for you to run a patched version of Flink? If yes, can
> you try to do the following: In the class "FileStateHandle", in the method
> "discardState()", remove the code around "FileUtils.deletePathIfEmpty(...)"
> - this is probably not working well when hitting too many S3 files.
>
>   -  You can delete old "completedCheckpointXXXYYY" files, but please do
> not delete the other two types, they are needed for HA recovery.
>
> Greetings,
> Stephan
>
>
> On Mon, Jul 24, 2017 at 3:46 AM, prashantnayak <
> prashant@intellifylearning.com> wrote:
>
>> Hi Xiaogang and Stephan
>>
>> We're continuing to test and have now set up the cluster to disable
>> incremental RocksDB checkpointing as well as increasing the checkpoint
>> interval from 30s to 120s  (not ideal really :-( )
>>
>> We'll run it with a large number of jobs and report back if this setup
>> shows
>> improvement.
>>
>> Appreciate any another insights you might have around this problem.
>>
>> Thanks
>> Prashant
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/S3-recovery-and-che
>> ckpoint-directories-exhibit-explosive-growth-tp14270p14392.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Thanks Stephan

We can confirm that turning off RocksDB incremental checkpointing seems to
help and greatly reduces the number of files (from tens of thousands to low
thousands).

We still see that there is a inflection point when running > 50 jobs causes
the appmaster to stop deleting files from S3 and leads to a unbounded growth
(slower without incremental checkpointing) of the S3 recovery directory.

We would be willing to try a patched version (already have a fork)... just
to confirm you are suggest to delete the line "fs.delete(filePath, false);"
from discardState()?

```
        @Override
	public void discardState() throws Exception {

		FileSystem fs = getFileSystem();

		fs.delete(filePath, false);

		try {
			FileUtils.deletePathIfEmpty(fs, filePath.getParent());
		} catch (Exception ignored) {}
	}
```





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14452.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stephan Ewen <se...@apache.org>.
Hi Prashant!

I assume you are using Flink 1.3.0 or 1.3.1?

Here are some things you can do:

  - I would try and disable the incremental checkpointing for a start and
see what happens then. That should reduce the number of files already.

  - Is it possible for you to run a patched version of Flink? If yes, can
you try to do the following: In the class "FileStateHandle", in the method
"discardState()", remove the code around "FileUtils.deletePathIfEmpty(...)"
- this is probably not working well when hitting too many S3 files.

  -  You can delete old "completedCheckpointXXXYYY" files, but please do
not delete the other two types, they are needed for HA recovery.

Greetings,
Stephan


On Mon, Jul 24, 2017 at 3:46 AM, prashantnayak <
prashant@intellifylearning.com> wrote:

> Hi Xiaogang and Stephan
>
> We're continuing to test and have now set up the cluster to disable
> incremental RocksDB checkpointing as well as increasing the checkpoint
> interval from 30s to 120s  (not ideal really :-( )
>
> We'll run it with a large number of jobs and report back if this setup
> shows
> improvement.
>
> Appreciate any another insights you might have around this problem.
>
> Thanks
> Prashant
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-
> checkpoint-directories-exhibit-explosive-growth-tp14270p14392.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Hi Xiaogang and Stephan

We're continuing to test and have now set up the cluster to disable
incremental RocksDB checkpointing as well as increasing the checkpoint
interval from 30s to 120s  (not ideal really :-( )

We'll run it with a large number of jobs and report back if this setup shows
improvement.

Appreciate any another insights you might have around this problem.

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14392.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Wanted to add - we took some stack traces and memory dumps... will post them
or send them to you, but the stack trace indicates that the appmaster is
spending a lot of time in the AWS s3 library trying to list a S3 directory
(recovery?)

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14375.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by prashantnayak <pr...@intellifylearning.com>.
Hi Xiaogang and Stephan

Thank you for your response.  Sorry about the delay in responding (was
traveling):

We've been trying to figure out what triggers this - but your points about
master not being able to delete files "in time" seems to be correct.... 

We've been test out two different environments 
 1.  where we have a few jobs (< 10) - but these jobs have processed large
number of records (e.g. > 200-300 million)
 2.  where we have many jobs (> 40) - but these jobs are processing very low
number of records.

We observe that in (1) - recovery, checkpoint directory growth is very
proportional to number of jobs and number of retained checkpoints configured
(we set it to 2)

We observe that in (2) - recovery, checkpoint, ext-checkpoint directory
growth is very fast.  This environment will eventually get bogged down, get
unresponsive and then die.

To answer some of your other questions

  - Does it only occur with incremental checkpoints, or also with regular
checkpoints?
        we believe this occurs in both cases
  - How many checkpoints to you retain?
        we retain 2
  - Do you use externalized checkpoints?
        yes, and we set retention = 2 and retain_on_cancellation
  - Do you use a highly-available setup with ZooKeeper?
        yes, we do

We recently bumped up JobManager (appMaster) CPU and Heap in environment #2
(increased to 4 CPU, 2GB heap, 2.5GB memory allocated to Mesos container),
but that has had no effect.

Definitely appreciate any additional insight you might be able to provide. 
This is impeding us in production deployments.

Is there any way we can at least mitigate this growth? For e.g, we have a
script that can be cron'd and can delete files in the S3 recovery directory
that are older than X number of hours.  Is it OK to run this script and keep
only the last hour worth of recovery files?

We notice that there are about 3 types of files in recovery
   - completedCheckpointXXXYYY
   - mesosWorkerStoreXXXYYY
   - submittedJobGraphXXXXYYY

is it ok to have the cron job prune all of these so we only have last hour
worth, or just perhaps the completedCheckpoint files?

Happy to provide any additional detail you need.  Just let me know...

Thanks
Prashant



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-checkpoint-directories-exhibit-explosive-growth-tp14270p14374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi Prashantnayak

Thanks a lot for reporting this problem. Can you provide more details to
address it?

I am guessing master has to delete too many files when a checkpoint is
subsumed, which is very common in our cases. The number of files in the
recovery directory will increase if the master cannot delete these files in
time. It usually happens when the checkpoint interval is very small and the
degree of parallelism is very large.

Regards,
Xiaogang


2017-07-15 0:31 GMT+08:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> I am looping in Stefan and Xiaogang who worked a lot in incremental
> checkpointing.
>
> Some background on incremental checkpoints: Incremental checkpoints store
> "pieces" of the state (RocksDB ssTables) that are shared between
> checkpoints. Hence it naturally uses more files than no-incremental
> checkpoints.
>
> You could help us understand this with a few more details:
>   - Does it only occur with incremental checkpoints, or also with regular
> checkpoints?
>   - How many checkpoints to you retain?
>   - Do you use externalized checkpoints?
>   - Do you use a highly-available setup with ZooKeeper?
>
> Thanks,
> Stephan
>
>
>
> On Thu, Jul 13, 2017 at 10:43 PM, prashantnayak <
> prashant@intellifylearning.com> wrote:
>
>>
>> To add one more data point... it seems like the recovery directory is the
>> bottleneck somehow..  so if we delete the recovery directory and restart
>> the
>> job manager - it comes back and is responsive.
>>
>> Of course, we lose all jobs, since none can be recovered... and that is of
>> course not ideal.
>>
>> So the question seems to be why the recovery directory grows exponentially
>> in the first place.
>>
>> I can't imagine we're the only ones to see this... or we must be
>> configuring
>> something wrong while testing Flink 1.3.1
>>
>> Thanks for your help in advance
>>
>> Prashant
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/S3-recovery-and-che
>> ckpoint-directories-exhibit-explosive-growth-tp14270p14271.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: S3 recovery and checkpoint directories exhibit explosive growth

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I am looping in Stefan and Xiaogang who worked a lot in incremental
checkpointing.

Some background on incremental checkpoints: Incremental checkpoints store
"pieces" of the state (RocksDB ssTables) that are shared between
checkpoints. Hence it naturally uses more files than no-incremental
checkpoints.

You could help us understand this with a few more details:
  - Does it only occur with incremental checkpoints, or also with regular
checkpoints?
  - How many checkpoints to you retain?
  - Do you use externalized checkpoints?
  - Do you use a highly-available setup with ZooKeeper?

Thanks,
Stephan



On Thu, Jul 13, 2017 at 10:43 PM, prashantnayak <
prashant@intellifylearning.com> wrote:

>
> To add one more data point... it seems like the recovery directory is the
> bottleneck somehow..  so if we delete the recovery directory and restart
> the
> job manager - it comes back and is responsive.
>
> Of course, we lose all jobs, since none can be recovered... and that is of
> course not ideal.
>
> So the question seems to be why the recovery directory grows exponentially
> in the first place.
>
> I can't imagine we're the only ones to see this... or we must be
> configuring
> something wrong while testing Flink 1.3.1
>
> Thanks for your help in advance
>
> Prashant
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/S3-recovery-and-
> checkpoint-directories-exhibit-explosive-growth-tp14270p14271.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>