You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rinat <r....@cleverdata.ru> on 2017/10/20 09:08:01 UTC

Flink BucketingSink, subscribe on moving of file into final state

Hi All !

I’m trying to create a meta-info file, that contains link to file, created by Flink BucketingSink.
At first I was trying to implement my own org.apache.flink.streaming.connectors.fs.Writer, that creates a meta-file on close method call. 
But I understood, that it’s not completely right, because when writer is closed, file, into which data were written, is in in-progress state and in final state it will change it’s name. 
So create any meta-info on writer closing, that links to the in-progress file, will lead my system to inconsistent state.

I looked through the sources of BucketingSink, and have not found an elegant way to perform any kind of subscription on moving file with data into final state.
Maybe someone already had the same issue and found elegant way how it could be solved ?

Also maybe someone know how this issue could be solved using other Flink tools/ components, because I'm not so long using Flink and maybe don’t know some of it's features.

Thx.

Re: Flink BucketingSink, subscribe on moving of file into final state

Posted by Piotr Nowojski <pi...@data-artisans.com>.
You’re welcome. Unfortunately I am not aware about a such use case before 

Piotrek

> On 20 Oct 2017, at 13:47, Rinat <r....@cleverdata.ru> wrote:
> 
> Piotrek, thanks for your reply.
> 
> Yes, now I’m looking for the most suitable way to extend BucketingSink functionality, to handle moments of moving the file into final state.
> I thought, that maybe someone has already implemented such thing or knows any other approaches that will help me to not copy/ paste existing sink impl ))
> 
> Thx !
> 
> 
>> On 20 Oct 2017, at 14:37, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Piotrek
> 


Re: Flink BucketingSink, subscribe on moving of file into final state

Posted by Rinat <r....@cleverdata.ru>.
Piotrek, thanks for your reply.

Yes, now I’m looking for the most suitable way to extend BucketingSink functionality, to handle moments of moving the file into final state.
I thought, that maybe someone has already implemented such thing or knows any other approaches that will help me to not copy/ paste existing sink impl ))

Thx !


> On 20 Oct 2017, at 14:37, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Piotrek


Re: Flink BucketingSink, subscribe on moving of file into final state

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Maybe you can just list files in your basePath and filter out those that have inProgress or pending suffixes? 

I think you could wrap/implement your own Bucketer and track all the paths that it returns. However some of those might be pending or in progress files that will be committed in the future (or in case of crash some of them might be left over and should be discarded)

Another possibility is that you can copy the code of BucketingSink and track the fs.rename calls that move file to a final path (in:
notifyCheckpointComplete
handlePendingFilesForPreviousCheckpoints
handlePendingInProgressFile)

Piotrek

> On 20 Oct 2017, at 11:08, Rinat <r....@cleverdata.ru> wrote:
> 
> Hi All !
> 
> I’m trying to create a meta-info file, that contains link to file, created by Flink BucketingSink.
> At first I was trying to implement my own org.apache.flink.streaming.connectors.fs.Writer, that creates a meta-file on close method call. 
> But I understood, that it’s not completely right, because when writer is closed, file, into which data were written, is in in-progress state and in final state it will change it’s name. 
> So create any meta-info on writer closing, that links to the in-progress file, will lead my system to inconsistent state.
> 
> I looked through the sources of BucketingSink, and have not found an elegant way to perform any kind of subscription on moving file with data into final state.
> Maybe someone already had the same issue and found elegant way how it could be solved ?
> 
> Also maybe someone know how this issue could be solved using other Flink tools/ components, because I'm not so long using Flink and maybe don’t know some of it's features.
> 
> Thx.