You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maciek Próchniak <mp...@touk.pl> on 2016/10/18 07:40:32 UTC

ContinuousFileMonitoringFunction - deleting file after processing

Hi,

we want to monitor hdfs (or local) directory, read csv files that appear 
and after successful processing - delete them (mainly not to run out of 
disk space...)

I'm not quite sure how to achieve it with current implementation. 
Previously, when we read binary data (unsplittable files) we made small 
hack and deleted them

in our FileInputFormat - but now we want to use splits and detecting 
which split is 'the last one' is no longer so obvious - of course it's 
also problematic when it comes to checkpointing...

So my question is - is there a idiomatic way of deleting processed files?


thanks,

maciek


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Maciek,

The first point seems interesting and we should definitely look into that, also for other filesystems e.g. HDFS.
It would be nice if we could together find a more “one-size-fits-all” solution. 
Because local fs rounds up to a second but other filesystems may have different strategies.

Now for the second point, the problem with introducing another operator with parallelism of 1 would mean 
another point of congestion probably. But it is definitely a point worth discussing.

I have already started a discussion thread in the dev mailing list.
The title is: [DISCUSS] Handling event-time in continuous file processing.

I would suggest to move the discussion there so that also other people can participate and contribute 
with their own use cases and ideas.

When this is finalized, it would be more than appreciated if you could also help with the implementation.
If you still want to, of course ;)

Thanks,
Kostas

> On Nov 26, 2016, at 9:40 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> 
> Hi Kostas,
> 
> I didn't see any discussion on dev mailing list, so I'd like to share our problems/solutions (we had a busy month...;)
> 
> 1. we refactored ContinuousFileMonitoringFunction so that state includes not only lastModificationTime, but also list of files that have exactly this modification time. This way we're sure that we don't loose any files that appear later with same modification time. It turned out that for local file system this is quite important, as modificationTime in java can have one second resolution (see e.g. http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java - we learned it the hard way...)
> 
> 2. we are able to safely delete files in following way:
>    - in ContinuousFileReaderOperator we emit additional marker event after end of split
>    - the split contains information how many splits are in the file
>    - we added additional operator of parallelism 1 after ContinuousFileReaderOperator which tracks additional events so that it knows when all splits from file has been processed and deletes finished files after appropriate checkpoints have been completed.
> 
> If you & other committers find these ideas ok, I can prepare jiras and pull requests. While the first point is pretty straightforward IMHO, I'd like to get some feedback one the second one.
> 
> thanks,
> maciek
> 
> On 18/10/2016 11:52, Kostas Kloudas wrote:
>> Hi Maciek,
>> 
>> I agree with you that 1ms is often too long :P
>> 
>> This is the reason why I will open a discussion to have
>> all the ideas/ requirements / shortcomings in a single place.
>> This way the community can track and influence what
>> is coming next.
>> 
>> Hopefully I will do it in the afternoon and I will send you
>> the discussion thread.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> thanks for quick answer.
>>> 
>>> I wouldn't dare to delete files in InputFormat if they were splitted and processed in parallel...
>>> 
>>> As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty interesting, I'll try to try it out. Although I wonder a bit if relying only on modification timestamp is enough - many things may happen in one ms :)
>>> 
>>> thanks,
>>> 
>>> macie
>>> 
>>> 
>>> On 18/10/2016 11:14, Kostas Kloudas wrote:
>>>> Hi Maciek,
>>>> 
>>>> Just a follow-up on the previous email, given that splits are read in parallel, when the
>>>> ContinuousFileMonitoringFunction forwards the last split, it does not mean that the
>>>> final splits is going to be processed last. If the node it gets assigned is fast enough
>>>> then it may be processed faster than others.
>>>> 
>>>> This assumption only holds if you have a parallelism of 1.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>>> 
>>>>> Hi Maciek,
>>>>> 
>>>>> Currently this functionality is not supported but this seems like a good addition.
>>>>> Actually, give that the feature is rather new, we were thinking of opening a discussion
>>>>> in the dev mailing list in order to
>>>>> 
>>>>> i) discuss some current limitations of the Continuous File Processing source
>>>>> ii) see how people use it and adjust our features accordingly
>>>>> 
>>>>> I will let you know as soon as I open this thread.
>>>>> 
>>>>> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
>>>>> that will inform the source that a given checkpoint was successfully performed and then
>>>>> we can purge the already processed files. This can be a good solution.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>>>>>> 
>>>>>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>>>>>> 
>>>>>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>>>>>> 
>>>>>> So my question is - is there a idiomatic way of deleting processed files?
>>>>>> 
>>>>>> 
>>>>>> thanks,
>>>>>> 
>>>>>> maciek
>>>>>> 
>> 
> 


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Maciek,

Thanks for bringing this up again and sorry for not opening the discussion yet.

I will check it out and get back to you during week.

Kostas

> On Nov 26, 2016, at 9:40 PM, Maciek Próchniak <mp...@touk.pl> wrote:
> 
> Hi Kostas,
> 
> I didn't see any discussion on dev mailing list, so I'd like to share our problems/solutions (we had a busy month...;)
> 
> 1. we refactored ContinuousFileMonitoringFunction so that state includes not only lastModificationTime, but also list of files that have exactly this modification time. This way we're sure that we don't loose any files that appear later with same modification time. It turned out that for local file system this is quite important, as modificationTime in java can have one second resolution (see e.g. http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java - we learned it the hard way...)
> 
> 2. we are able to safely delete files in following way:
>    - in ContinuousFileReaderOperator we emit additional marker event after end of split
>    - the split contains information how many splits are in the file
>    - we added additional operator of parallelism 1 after ContinuousFileReaderOperator which tracks additional events so that it knows when all splits from file has been processed and deletes finished files after appropriate checkpoints have been completed.
> 
> If you & other committers find these ideas ok, I can prepare jiras and pull requests. While the first point is pretty straightforward IMHO, I'd like to get some feedback one the second one.
> 
> thanks,
> maciek
> 
> On 18/10/2016 11:52, Kostas Kloudas wrote:
>> Hi Maciek,
>> 
>> I agree with you that 1ms is often too long :P
>> 
>> This is the reason why I will open a discussion to have
>> all the ideas/ requirements / shortcomings in a single place.
>> This way the community can track and influence what
>> is coming next.
>> 
>> Hopefully I will do it in the afternoon and I will send you
>> the discussion thread.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> thanks for quick answer.
>>> 
>>> I wouldn't dare to delete files in InputFormat if they were splitted and processed in parallel...
>>> 
>>> As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty interesting, I'll try to try it out. Although I wonder a bit if relying only on modification timestamp is enough - many things may happen in one ms :)
>>> 
>>> thanks,
>>> 
>>> macie
>>> 
>>> 
>>> On 18/10/2016 11:14, Kostas Kloudas wrote:
>>>> Hi Maciek,
>>>> 
>>>> Just a follow-up on the previous email, given that splits are read in parallel, when the
>>>> ContinuousFileMonitoringFunction forwards the last split, it does not mean that the
>>>> final splits is going to be processed last. If the node it gets assigned is fast enough
>>>> then it may be processed faster than others.
>>>> 
>>>> This assumption only holds if you have a parallelism of 1.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>>> 
>>>>> Hi Maciek,
>>>>> 
>>>>> Currently this functionality is not supported but this seems like a good addition.
>>>>> Actually, give that the feature is rather new, we were thinking of opening a discussion
>>>>> in the dev mailing list in order to
>>>>> 
>>>>> i) discuss some current limitations of the Continuous File Processing source
>>>>> ii) see how people use it and adjust our features accordingly
>>>>> 
>>>>> I will let you know as soon as I open this thread.
>>>>> 
>>>>> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
>>>>> that will inform the source that a given checkpoint was successfully performed and then
>>>>> we can purge the already processed files. This can be a good solution.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>>>>>> 
>>>>>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>>>>>> 
>>>>>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>>>>>> 
>>>>>> So my question is - is there a idiomatic way of deleting processed files?
>>>>>> 
>>>>>> 
>>>>>> thanks,
>>>>>> 
>>>>>> maciek
>>>>>> 
>> 
> 


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi Kostas,

I didn't see any discussion on dev mailing list, so I'd like to share 
our problems/solutions (we had a busy month...;)

1. we refactored ContinuousFileMonitoringFunction so that state includes 
not only lastModificationTime, but also list of files that have exactly 
this modification time. This way we're sure that we don't loose any 
files that appear later with same modification time. It turned out that 
for local file system this is quite important, as modificationTime in 
java can have one second resolution (see e.g. 
http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java 
- we learned it the hard way...)

2. we are able to safely delete files in following way:
     - in ContinuousFileReaderOperator we emit additional marker event 
after end of split
     - the split contains information how many splits are in the file
     - we added additional operator of parallelism 1 after 
ContinuousFileReaderOperator which tracks additional events so that it 
knows when all splits from file has been processed and deletes finished 
files after appropriate checkpoints have been completed.

If you & other committers find these ideas ok, I can prepare jiras and 
pull requests. While the first point is pretty straightforward IMHO, I'd 
like to get some feedback one the second one.

thanks,
maciek

On 18/10/2016 11:52, Kostas Kloudas wrote:
> Hi Maciek,
>
> I agree with you that 1ms is often too long :P
>
> This is the reason why I will open a discussion to have
> all the ideas/ requirements / shortcomings in a single place.
> This way the community can track and influence what
> is coming next.
>
> Hopefully I will do it in the afternoon and I will send you
> the discussion thread.
>
> Cheers,
> Kostas
>
>> On Oct 18, 2016, at 11:43 AM, Maciek Pr�chniak <mp...@touk.pl> wrote:
>>
>> Hi Kostas,
>>
>> thanks for quick answer.
>>
>> I wouldn't dare to delete files in InputFormat if they were splitted and processed in parallel...
>>
>> As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty interesting, I'll try to try it out. Although I wonder a bit if relying only on modification timestamp is enough - many things may happen in one ms :)
>>
>> thanks,
>>
>> macie
>>
>>
>> On 18/10/2016 11:14, Kostas Kloudas wrote:
>>> Hi Maciek,
>>>
>>> Just a follow-up on the previous email, given that splits are read in parallel, when the
>>> ContinuousFileMonitoringFunction forwards the last split, it does not mean that the
>>> final splits is going to be processed last. If the node it gets assigned is fast enough
>>> then it may be processed faster than others.
>>>
>>> This assumption only holds if you have a parallelism of 1.
>>>
>>> Cheers,
>>> Kostas
>>>
>>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
>>>>
>>>> Hi Maciek,
>>>>
>>>> Currently this functionality is not supported but this seems like a good addition.
>>>> Actually, give that the feature is rather new, we were thinking of opening a discussion
>>>> in the dev mailing list in order to
>>>>
>>>> i) discuss some current limitations of the Continuous File Processing source
>>>> ii) see how people use it and adjust our features accordingly
>>>>
>>>> I will let you know as soon as I open this thread.
>>>>
>>>> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
>>>> that will inform the source that a given checkpoint was successfully performed and then
>>>> we can purge the already processed files. This can be a good solution.
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>>> On Oct 18, 2016, at 9:40 AM, Maciek Pr�chniak <mp...@touk.pl> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>>>>>
>>>>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>>>>>
>>>>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>>>>>
>>>>> So my question is - is there a idiomatic way of deleting processed files?
>>>>>
>>>>>
>>>>> thanks,
>>>>>
>>>>> maciek
>>>>>
>


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Maciek,

I agree with you that 1ms is often too long :P

This is the reason why I will open a discussion to have
all the ideas/ requirements / shortcomings in a single place.
This way the community can track and influence what
is coming next. 

Hopefully I will do it in the afternoon and I will send you 
the discussion thread.

Cheers,
Kostas

> On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <mp...@touk.pl> wrote:
> 
> Hi Kostas,
> 
> thanks for quick answer.
> 
> I wouldn't dare to delete files in InputFormat if they were splitted and processed in parallel...
> 
> As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty interesting, I'll try to try it out. Although I wonder a bit if relying only on modification timestamp is enough - many things may happen in one ms :)
> 
> thanks,
> 
> macie
> 
> 
> On 18/10/2016 11:14, Kostas Kloudas wrote:
>> Hi Maciek,
>> 
>> Just a follow-up on the previous email, given that splits are read in parallel, when the
>> ContinuousFileMonitoringFunction forwards the last split, it does not mean that the
>> final splits is going to be processed last. If the node it gets assigned is fast enough
>> then it may be processed faster than others.
>> 
>> This assumption only holds if you have a parallelism of 1.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
>>> 
>>> Hi Maciek,
>>> 
>>> Currently this functionality is not supported but this seems like a good addition.
>>> Actually, give that the feature is rather new, we were thinking of opening a discussion
>>> in the dev mailing list in order to
>>> 
>>> i) discuss some current limitations of the Continuous File Processing source
>>> ii) see how people use it and adjust our features accordingly
>>> 
>>> I will let you know as soon as I open this thread.
>>> 
>>> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
>>> that will inform the source that a given checkpoint was successfully performed and then
>>> we can purge the already processed files. This can be a good solution.
>>> 
>>> Thanks,
>>> Kostas
>>> 
>>>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>>>> 
>>>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>>>> 
>>>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>>>> 
>>>> So my question is - is there a idiomatic way of deleting processed files?
>>>> 
>>>> 
>>>> thanks,
>>>> 
>>>> maciek
>>>> 
>> 
> 


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi Kostas,

thanks for quick answer.

I wouldn't dare to delete files in InputFormat if they were splitted and 
processed in parallel...

As for using notifyCheckpointComplete - thanks for suggestion, it looks 
pretty interesting, I'll try to try it out. Although I wonder a bit if 
relying only on modification timestamp is enough - many things may 
happen in one ms :)

thanks,

macie


On 18/10/2016 11:14, Kostas Kloudas wrote:
> Hi Maciek,
>
> Just a follow-up on the previous email, given that splits are read in parallel, when the
> ContinuousFileMonitoringFunction forwards the last split, it does not mean that the
> final splits is going to be processed last. If the node it gets assigned is fast enough
> then it may be processed faster than others.
>
> This assumption only holds if you have a parallelism of 1.
>
> Cheers,
> Kostas
>
>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
>>
>> Hi Maciek,
>>
>> Currently this functionality is not supported but this seems like a good addition.
>> Actually, give that the feature is rather new, we were thinking of opening a discussion
>> in the dev mailing list in order to
>>
>> i) discuss some current limitations of the Continuous File Processing source
>> ii) see how people use it and adjust our features accordingly
>>
>> I will let you know as soon as I open this thread.
>>
>> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
>> that will inform the source that a given checkpoint was successfully performed and then
>> we can purge the already processed files. This can be a good solution.
>>
>> Thanks,
>> Kostas
>>
>>> On Oct 18, 2016, at 9:40 AM, Maciek Pr�chniak <mp...@touk.pl> wrote:
>>>
>>> Hi,
>>>
>>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>>>
>>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>>>
>>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>>>
>>> So my question is - is there a idiomatic way of deleting processed files?
>>>
>>>
>>> thanks,
>>>
>>> maciek
>>>
>


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Maciek,

Just a follow-up on the previous email, given that splits are read in parallel, when the 
ContinuousFileMonitoringFunction forwards the last split, it does not mean that the 
final splits is going to be processed last. If the node it gets assigned is fast enough
then it may be processed faster than others. 

This assumption only holds if you have a parallelism of 1.

Cheers,
Kostas

> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Maciek,
> 
> Currently this functionality is not supported but this seems like a good addition.
> Actually, give that the feature is rather new, we were thinking of opening a discussion 
> in the dev mailing list in order to 
> 
> i) discuss some current limitations of the Continuous File Processing source
> ii) see how people use it and adjust our features accordingly
> 
> I will let you know as soon as I open this thread.
> 
> By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
> that will inform the source that a given checkpoint was successfully performed and then 
> we can purge the already processed files. This can be a good solution.
> 
> Thanks,
> Kostas
> 
>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <mp...@touk.pl> wrote:
>> 
>> Hi,
>> 
>> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
>> 
>> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
>> 
>> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
>> 
>> So my question is - is there a idiomatic way of deleting processed files?
>> 
>> 
>> thanks,
>> 
>> maciek
>> 
> 


Re: ContinuousFileMonitoringFunction - deleting file after processing

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Maciek,

Currently this functionality is not supported but this seems like a good addition.
Actually, give that the feature is rather new, we were thinking of opening a discussion 
in the dev mailing list in order to 

i) discuss some current limitations of the Continuous File Processing source
ii) see how people use it and adjust our features accordingly

I will let you know as soon as I open this thread.

By the way for your use-case, we should probably have a callback in the notifyCheckpointComplete()
that will inform the source that a given checkpoint was successfully performed and then 
we can purge the already processed files. This can be a good solution.

Thanks,
Kostas

> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <mp...@touk.pl> wrote:
> 
> Hi,
> 
> we want to monitor hdfs (or local) directory, read csv files that appear and after successful processing - delete them (mainly not to run out of disk space...)
> 
> I'm not quite sure how to achieve it with current implementation. Previously, when we read binary data (unsplittable files) we made small hack and deleted them
> 
> in our FileInputFormat - but now we want to use splits and detecting which split is 'the last one' is no longer so obvious - of course it's also problematic when it comes to checkpointing...
> 
> So my question is - is there a idiomatic way of deleting processed files?
> 
> 
> thanks,
> 
> maciek
>