You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Kostas Kloudas <k....@data-artisans.com> on 2018/07/12 11:40:31 UTC

Re: Adding a part suffix setter to the BucketingSink

Hi Lakshmi,

Since Flink-1.5 you have the ability to set the part suffix.
As you said, you only want the .gzip to be the suffix of the final (or “completed”) part files, which is exactly what is currently supported.

If you want also intermediate files to have this suffix, then you can always set all the suffixes (in-progress, pending and final) to “.gzip” 
but then you have to also set the appropriate preffixes so that Flink can distinguish completed from non-completed files (filenames 
must not collide).

Also I would recommend to use the most recent stable version 1.5.3 which also includes this bug fix:
https://issues.apache.org/jira/browse/FLINK-9603 <https://issues.apache.org/jira/browse/FLINK-9603>

I hope this helps,
Kostas


> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <lr...@lyft.com> wrote:
> 
> I can see two ways of achieving this:
> 
> 1. Setting a suffix* **only*** for the completed part files. I don't
> necessarily think the suffix should be added for the intermediate files (as
> intermediate files should not really be ready for consumption by a
> downstream process?)
> 2. Be able to override this partPath name creation -
> https://github.com/apache/flink/blob/release-1.4.0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L523
> . That way any user who needs to set a custom/dynamic part file name can
> still do so.
> 
> Do you think either or one of these options is feasible?
> 
> Thanks
> Lakshmi
> 
> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> So you want to be able to set a "global" suffix that should be appended to
>> all different kinds of files that the sink writes, including intermediate
>> files?
>> 
>> Aljoscha
>> 
>>> On 29. Mar 2018, at 16:59, lrao@lyft.com wrote:
>>> 
>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>> part file *suffix* with the required file extension. "
>>> 
>>> 
>>> On 2018/03/29 14:55:43, lrao@lyft.com <lr...@lyft.com> wrote:
>>>> Currently the BucketingSink allows addition of part prefix, pending
>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also
>> support setting part suffixes?
>>>> An instance where this maybe useful: I am currently writing GZIP
>> compressed output to S3 using the BucketingSink and I would want the
>> uploaded files to have a ".gz" or ".zip" extensions (if the files does not
>> have such an extensionelse they are written as garbled bytes and don't get
>> rendered correctly for reading). I don't see a way of doing this apart from
>> setting a part file prefix with the required file extension.
>>>> 
>>>> Thanks
>>>> Lakshmi
>>>> 
>> 
>> 
> 
> 
> -- 
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>


Re: Adding a part suffix setter to the BucketingSink

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Lakshmi,

I meant Flink 1.5.1 (not 1.5.3) which was recently released.

Cheers,
Kostas

> On Jul 12, 2018, at 7:34 PM, Lakshmi Gururaja Rao <lr...@lyft.com> wrote:
> 
> Hi Kostas,
> 
> Thank you for replying. I am already using the ability to set part suffix.
> I was not aware of this issue - https://issues.apache.org/
> jira/browse/FLINK-9603. Thanks for pointing out, I'll make sure to use the
> 1.5.3 version of the sink.
> 
> Thanks
> Lakshmi
> 
> 
> On Thu, Jul 12, 2018 at 4:55 AM, vino yang <ya...@gmail.com> wrote:
> 
>> Hi Kostas, good job!
>> 
>> 2018-07-12 19:40 GMT+08:00 Kostas Kloudas <k....@data-artisans.com>:
>> 
>>> Hi Lakshmi,
>>> 
>>> Since Flink-1.5 you have the ability to set the part suffix.
>>> As you said, you only want the .gzip to be the suffix of the final (or
>>> “completed”) part files, which is exactly what is currently supported.
>>> 
>>> If you want also intermediate files to have this suffix, then you can
>>> always set all the suffixes (in-progress, pending and final) to “.gzip”
>>> but then you have to also set the appropriate preffixes so that Flink can
>>> distinguish completed from non-completed files (filenames
>>> must not collide).
>>> 
>>> Also I would recommend to use the most recent stable version 1.5.3 which
>>> also includes this bug fix:
>>> https://issues.apache.org/jira/browse/FLINK-9603 <
>>> https://issues.apache.org/jira/browse/FLINK-9603>
>>> 
>>> I hope this helps,
>>> Kostas
>>> 
>>> 
>>>> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <lr...@lyft.com>
>> wrote:
>>>> 
>>>> I can see two ways of achieving this:
>>>> 
>>>> 1. Setting a suffix* **only*** for the completed part files. I don't
>>>> necessarily think the suffix should be added for the intermediate files
>>> (as
>>>> intermediate files should not really be ready for consumption by a
>>>> downstream process?)
>>>> 2. Be able to override this partPath name creation -
>>>> https://github.com/apache/flink/blob/release-1.4.0/
>>> flink-connectors/flink-connector-filesystem/src/main/
>>> java/org/apache/flink/streaming/connectors/fs/
>>> bucketing/BucketingSink.java#L523
>>>> . That way any user who needs to set a custom/dynamic part file name
>> can
>>>> still do so.
>>>> 
>>>> Do you think either or one of these options is feasible?
>>>> 
>>>> Thanks
>>>> Lakshmi
>>>> 
>>>> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <aljoscha@apache.org
>>> 
>>>> wrote:
>>>> 
>>>>> So you want to be able to set a "global" suffix that should be
>> appended
>>> to
>>>>> all different kinds of files that the sink writes, including
>>> intermediate
>>>>> files?
>>>>> 
>>>>> Aljoscha
>>>>> 
>>>>>> On 29. Mar 2018, at 16:59, lrao@lyft.com wrote:
>>>>>> 
>>>>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>>>>> part file *suffix* with the required file extension. "
>>>>>> 
>>>>>> 
>>>>>> On 2018/03/29 14:55:43, lrao@lyft.com <lr...@lyft.com> wrote:
>>>>>>> Currently the BucketingSink allows addition of part prefix, pending
>>>>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
>>> also
>>>>> support setting part suffixes?
>>>>>>> An instance where this maybe useful: I am currently writing GZIP
>>>>> compressed output to S3 using the BucketingSink and I would want the
>>>>> uploaded files to have a ".gz" or ".zip" extensions (if the files does
>>> not
>>>>> have such an extensionelse they are written as garbled bytes and don't
>>> get
>>>>> rendered correctly for reading). I don't see a way of doing this apart
>>> from
>>>>> setting a part file prefix with the required file extension.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Lakshmi
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> *Lakshmi Gururaja Rao*
>>>> SWE
>>>> 217.778.7218 <+12177787218>
>>>> [image: Lyft] <http://www.lyft.com/>
>>> 
>>> 
>> 
> 
> 
> 
> -- 
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>


Re: Adding a part suffix setter to the BucketingSink

Posted by Lakshmi Gururaja Rao <lr...@lyft.com>.
Hi Kostas,

Thank you for replying. I am already using the ability to set part suffix.
I was not aware of this issue - https://issues.apache.org/
jira/browse/FLINK-9603. Thanks for pointing out, I'll make sure to use the
1.5.3 version of the sink.

Thanks
Lakshmi


On Thu, Jul 12, 2018 at 4:55 AM, vino yang <ya...@gmail.com> wrote:

> Hi Kostas, good job!
>
> 2018-07-12 19:40 GMT+08:00 Kostas Kloudas <k....@data-artisans.com>:
>
> > Hi Lakshmi,
> >
> > Since Flink-1.5 you have the ability to set the part suffix.
> > As you said, you only want the .gzip to be the suffix of the final (or
> > “completed”) part files, which is exactly what is currently supported.
> >
> > If you want also intermediate files to have this suffix, then you can
> > always set all the suffixes (in-progress, pending and final) to “.gzip”
> > but then you have to also set the appropriate preffixes so that Flink can
> > distinguish completed from non-completed files (filenames
> > must not collide).
> >
> > Also I would recommend to use the most recent stable version 1.5.3 which
> > also includes this bug fix:
> > https://issues.apache.org/jira/browse/FLINK-9603 <
> > https://issues.apache.org/jira/browse/FLINK-9603>
> >
> > I hope this helps,
> > Kostas
> >
> >
> > > On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <lr...@lyft.com>
> wrote:
> > >
> > > I can see two ways of achieving this:
> > >
> > > 1. Setting a suffix* **only*** for the completed part files. I don't
> > > necessarily think the suffix should be added for the intermediate files
> > (as
> > > intermediate files should not really be ready for consumption by a
> > > downstream process?)
> > > 2. Be able to override this partPath name creation -
> > > https://github.com/apache/flink/blob/release-1.4.0/
> > flink-connectors/flink-connector-filesystem/src/main/
> > java/org/apache/flink/streaming/connectors/fs/
> > bucketing/BucketingSink.java#L523
> > > . That way any user who needs to set a custom/dynamic part file name
> can
> > > still do so.
> > >
> > > Do you think either or one of these options is feasible?
> > >
> > > Thanks
> > > Lakshmi
> > >
> > > On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > >> So you want to be able to set a "global" suffix that should be
> appended
> > to
> > >> all different kinds of files that the sink writes, including
> > intermediate
> > >> files?
> > >>
> > >> Aljoscha
> > >>
> > >>> On 29. Mar 2018, at 16:59, lrao@lyft.com wrote:
> > >>>
> > >>> Sorry, I meant "I don't see a way of doing this apart from setting a
> > >> part file *suffix* with the required file extension. "
> > >>>
> > >>>
> > >>> On 2018/03/29 14:55:43, lrao@lyft.com <lr...@lyft.com> wrote:
> > >>>> Currently the BucketingSink allows addition of part prefix, pending
> > >> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
> > also
> > >> support setting part suffixes?
> > >>>> An instance where this maybe useful: I am currently writing GZIP
> > >> compressed output to S3 using the BucketingSink and I would want the
> > >> uploaded files to have a ".gz" or ".zip" extensions (if the files does
> > not
> > >> have such an extensionelse they are written as garbled bytes and don't
> > get
> > >> rendered correctly for reading). I don't see a way of doing this apart
> > from
> > >> setting a part file prefix with the required file extension.
> > >>>>
> > >>>> Thanks
> > >>>> Lakshmi
> > >>>>
> > >>
> > >>
> > >
> > >
> > > --
> > > *Lakshmi Gururaja Rao*
> > > SWE
> > > 217.778.7218 <+12177787218>
> > > [image: Lyft] <http://www.lyft.com/>
> >
> >
>



-- 
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>

Re: Adding a part suffix setter to the BucketingSink

Posted by vino yang <ya...@gmail.com>.
Hi Kostas, good job!

2018-07-12 19:40 GMT+08:00 Kostas Kloudas <k....@data-artisans.com>:

> Hi Lakshmi,
>
> Since Flink-1.5 you have the ability to set the part suffix.
> As you said, you only want the .gzip to be the suffix of the final (or
> “completed”) part files, which is exactly what is currently supported.
>
> If you want also intermediate files to have this suffix, then you can
> always set all the suffixes (in-progress, pending and final) to “.gzip”
> but then you have to also set the appropriate preffixes so that Flink can
> distinguish completed from non-completed files (filenames
> must not collide).
>
> Also I would recommend to use the most recent stable version 1.5.3 which
> also includes this bug fix:
> https://issues.apache.org/jira/browse/FLINK-9603 <
> https://issues.apache.org/jira/browse/FLINK-9603>
>
> I hope this helps,
> Kostas
>
>
> > On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <lr...@lyft.com> wrote:
> >
> > I can see two ways of achieving this:
> >
> > 1. Setting a suffix* **only*** for the completed part files. I don't
> > necessarily think the suffix should be added for the intermediate files
> (as
> > intermediate files should not really be ready for consumption by a
> > downstream process?)
> > 2. Be able to override this partPath name creation -
> > https://github.com/apache/flink/blob/release-1.4.0/
> flink-connectors/flink-connector-filesystem/src/main/
> java/org/apache/flink/streaming/connectors/fs/
> bucketing/BucketingSink.java#L523
> > . That way any user who needs to set a custom/dynamic part file name can
> > still do so.
> >
> > Do you think either or one of these options is feasible?
> >
> > Thanks
> > Lakshmi
> >
> > On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> So you want to be able to set a "global" suffix that should be appended
> to
> >> all different kinds of files that the sink writes, including
> intermediate
> >> files?
> >>
> >> Aljoscha
> >>
> >>> On 29. Mar 2018, at 16:59, lrao@lyft.com wrote:
> >>>
> >>> Sorry, I meant "I don't see a way of doing this apart from setting a
> >> part file *suffix* with the required file extension. "
> >>>
> >>>
> >>> On 2018/03/29 14:55:43, lrao@lyft.com <lr...@lyft.com> wrote:
> >>>> Currently the BucketingSink allows addition of part prefix, pending
> >> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
> also
> >> support setting part suffixes?
> >>>> An instance where this maybe useful: I am currently writing GZIP
> >> compressed output to S3 using the BucketingSink and I would want the
> >> uploaded files to have a ".gz" or ".zip" extensions (if the files does
> not
> >> have such an extensionelse they are written as garbled bytes and don't
> get
> >> rendered correctly for reading). I don't see a way of doing this apart
> from
> >> setting a part file prefix with the required file extension.
> >>>>
> >>>> Thanks
> >>>> Lakshmi
> >>>>
> >>
> >>
> >
> >
> > --
> > *Lakshmi Gururaja Rao*
> > SWE
> > 217.778.7218 <+12177787218>
> > [image: Lyft] <http://www.lyft.com/>
>
>