You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Artsem Semianenka <ar...@gmail.com> on 2018/08/20 11:40:32 UTC

Support Hadoop 2.6 for StreamingFileSink

Hi guys !
I have a question regarding new StreamingFileSink (introduced in 1.6
version) . We use this sink to write data into Parquet format. But I faced
with issue when trying to run job on Yarn cluster and save result to HDFS.
In our case we use latest Cloudera distributive (CHD 5.15) and it contains
HDFS 2.6.0  . This version is not support truncate method . I would like to
create Pull request but I want to ask your advice how better design this
fix and which ideas are behind this decision . I saw similiar PR for
BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I could
also add support of valid-length files for older Hadoop versions ?

P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .

Best regards,
Artsem

Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Artsem Semianenka <ar...@gmail.com>.
Hi guys,

I've created a ticket for that issue in Jira and proposed possible solution
just to continue our discussion
and develop a plan how to fix the issue.

https://issues.apache.org/jira/browse/FLINK-10203

Cheers,
Artsem

On Tue, 21 Aug 2018 at 16:59, Artsem Semianenka <ar...@gmail.com>
wrote:

> Thanks Kostas for reply,
>
> But till there are distributions like Cloudera which latest version (5.15)
> based on Hadoop 2.6
> I and many other Cloudera users obliged to use an older HDFS version.
> Moreover I read discussion
> on Cloudera forum regarding moving to more fresh version of Hadoop, and
> Cloudera guys said
> that they are not going to do that because they concentrate on 6th version
> based on Hadoop 3.x .
> In this case I doubt that Flink ready to work with latest Hadoop 3.x
> version.
> And as the result my company as Cloudera user in the trap. We place a bet
> on Flink but can't use it
> with our environment .
>
> I will think about you idea of RecoverableStream without truncate for Bulk
> encoders. But to tell the truth
> currently I have no idea how to implement it . Because idiomatically
> RecoverableWriter should be able
> recover form specified pointer. In our case for Parquet BulkFormat we
> don't need to recover we should
> recreate hole file with checkpointed state. It not looks like
> RecoverableWriter.
>
> Cheers,
> Artsem
>
>
> On Tue, 21 Aug 2018 at 16:09, Kostas Kloudas <k....@data-artisans.com>
> wrote:
>
>> Hi Artsem,
>>
>> Till is correct in that getting rid of the “valid-length” file was a
>> design decision
>> for the new StreamingFileSink since the beginning. The motivation was
>> that
>> users were reporting that essentially it was very cumbersome to use.
>>
>> In general, when the BucketingSink gets deprecated, I could see a benefit
>> in having a
>> legacy recoverable stream just in case you are obliged to use an older
>> HDFS version.
>> But, at least for now, this would be useful only for row-wise encoders,
>> and NOT for
>> bulk-encoders like Parquet.
>>
>> The reason is that for now, when using bulk encoders you roll on every
>> checkpoint.
>> This implies that you do not need truncate, or the valid length file.
>> Given this,
>> you may only need to write a Recoverable stream that just does not
>> truncate.
>>
>> Would you like to try it out and see if it works for your usecase?
>>
>> Cheers,
>> Kostas
>>
>> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka <ar...@gmail.com>
>> wrote:
>>
>> Thanks for reply, Till !
>>
>> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I
>> don't see another way how to achieve it.
>> As I mention before one of popular distributive Cloudera still based on
>> Hadoop 2.6 and it very sad if Flink unsupport it.
>> I really want to help Flink comunity to support this legacy. But
>> currently I see only one way to acheve it by emulate 'truncate' logic and
>> recreate new file with needed lenght and replace old .
>>
>> Cheers,
>> Artsem
>>
>> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann <tr...@apache.org> wrote:
>>
>>> Hi Artsem,
>>>
>>> if I recall correctly, then we explicitly decided to not support the
>>> valid
>>> file length files with the new StreamingFileSink because they are really
>>> hard to handle for the user. I've pulled Klou into this conversation who
>>> is
>>> more knowledgeable and can give you a bit more advice.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka <
>>> artfulonline@gmail.com>
>>> wrote:
>>>
>>> > I have an idea to create new version of
>>> HadoopRecoverableFsDataOutputStream
>>> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream
>>> :) )
>>> > which will works with valid-length files without invoking truncate. And
>>> > modify check in HadoopRecoverableWriter to use
>>> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
>>> > lower then 2.7 . I will try to provide PR soon if no objections. I
>>> hope I
>>> > am on the right way.
>>> >
>>> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <
>>> artfulonline@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi guys !
>>> > > I have a question regarding new StreamingFileSink (introduced in 1.6
>>> > > version) . We use this sink to write data into Parquet format. But I
>>> > faced
>>> > > with issue when trying to run job on Yarn cluster and save result to
>>> > HDFS.
>>> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
>>> > contains
>>> > > HDFS 2.6.0  . This version is not support truncate method . I would
>>> like
>>> > to
>>> > > create Pull request but I want to ask your advice how better design
>>> this
>>> > > fix and which ideas are behind this decision . I saw similiar PR for
>>> > > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I
>>> could
>>> > > also add support of valid-length files for older Hadoop versions ?
>>> > >
>>> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
>>> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
>>> > >
>>> > > Best regards,
>>> > > Artsem
>>> > >
>>> >
>>> >
>>> > --
>>> >
>>> > С уважением,
>>> > Артем Семененко
>>> >
>>>
>>
>>
>> --
>>
>> С уважением,
>> Артем Семененко
>>
>>
>>
>
> --
>
> С уважением,
> Артем Семененко
>


-- 

С уважением,
Артем Семененко

Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Artsem Semianenka <ar...@gmail.com>.
Thanks Kostas for reply,

But till there are distributions like Cloudera which latest version (5.15)
based on Hadoop 2.6
I and many other Cloudera users obliged to use an older HDFS version.
Moreover I read discussion
on Cloudera forum regarding moving to more fresh version of Hadoop, and
Cloudera guys said
that they are not going to do that because they concentrate on 6th version
based on Hadoop 3.x .
In this case I doubt that Flink ready to work with latest Hadoop 3.x
version.
And as the result my company as Cloudera user in the trap. We place a bet
on Flink but can't use it
with our environment .

I will think about you idea of RecoverableStream without truncate for Bulk
encoders. But to tell the truth
currently I have no idea how to implement it . Because idiomatically
RecoverableWriter should be able
recover form specified pointer. In our case for Parquet BulkFormat we don't
need to recover we should
recreate hole file with checkpointed state. It not looks like
RecoverableWriter.

Cheers,
Artsem


On Tue, 21 Aug 2018 at 16:09, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Artsem,
>
> Till is correct in that getting rid of the “valid-length” file was a
> design decision
> for the new StreamingFileSink since the beginning. The motivation was that
> users were reporting that essentially it was very cumbersome to use.
>
> In general, when the BucketingSink gets deprecated, I could see a benefit
> in having a
> legacy recoverable stream just in case you are obliged to use an older
> HDFS version.
> But, at least for now, this would be useful only for row-wise encoders,
> and NOT for
> bulk-encoders like Parquet.
>
> The reason is that for now, when using bulk encoders you roll on every
> checkpoint.
> This implies that you do not need truncate, or the valid length file.
> Given this,
> you may only need to write a Recoverable stream that just does not
> truncate.
>
> Would you like to try it out and see if it works for your usecase?
>
> Cheers,
> Kostas
>
> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka <ar...@gmail.com>
> wrote:
>
> Thanks for reply, Till !
>
> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I
> don't see another way how to achieve it.
> As I mention before one of popular distributive Cloudera still based on
> Hadoop 2.6 and it very sad if Flink unsupport it.
> I really want to help Flink comunity to support this legacy. But currently
> I see only one way to acheve it by emulate 'truncate' logic and recreate
> new file with needed lenght and replace old .
>
> Cheers,
> Artsem
>
> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann <tr...@apache.org> wrote:
>
>> Hi Artsem,
>>
>> if I recall correctly, then we explicitly decided to not support the valid
>> file length files with the new StreamingFileSink because they are really
>> hard to handle for the user. I've pulled Klou into this conversation who
>> is
>> more knowledgeable and can give you a bit more advice.
>>
>> Cheers,
>> Till
>>
>> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka <artfulonline@gmail.com
>> >
>> wrote:
>>
>> > I have an idea to create new version of
>> HadoopRecoverableFsDataOutputStream
>> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream
>> :) )
>> > which will works with valid-length files without invoking truncate. And
>> > modify check in HadoopRecoverableWriter to use
>> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
>> > lower then 2.7 . I will try to provide PR soon if no objections. I hope
>> I
>> > am on the right way.
>> >
>> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <artfulonline@gmail.com
>> >
>> > wrote:
>> >
>> > > Hi guys !
>> > > I have a question regarding new StreamingFileSink (introduced in 1.6
>> > > version) . We use this sink to write data into Parquet format. But I
>> > faced
>> > > with issue when trying to run job on Yarn cluster and save result to
>> > HDFS.
>> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
>> > contains
>> > > HDFS 2.6.0  . This version is not support truncate method . I would
>> like
>> > to
>> > > create Pull request but I want to ask your advice how better design
>> this
>> > > fix and which ideas are behind this decision . I saw similiar PR for
>> > > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I
>> could
>> > > also add support of valid-length files for older Hadoop versions ?
>> > >
>> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
>> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
>> > >
>> > > Best regards,
>> > > Artsem
>> > >
>> >
>> >
>> > --
>> >
>> > С уважением,
>> > Артем Семененко
>> >
>>
>
>
> --
>
> С уважением,
> Артем Семененко
>
>
>

-- 

С уважением,
Артем Семененко

Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Artsem,

Till is correct in that getting rid of the “valid-length” file was a design decision 
for the new StreamingFileSink since the beginning. The motivation was that 
users were reporting that essentially it was very cumbersome to use.

In general, when the BucketingSink gets deprecated, I could see a benefit in having a 
legacy recoverable stream just in case you are obliged to use an older HDFS version. 
But, at least for now, this would be useful only for row-wise encoders, and NOT for 
bulk-encoders like Parquet.

The reason is that for now, when using bulk encoders you roll on every checkpoint.
This implies that you do not need truncate, or the valid length file. Given this, 
you may only need to write a Recoverable stream that just does not truncate.

Would you like to try it out and see if it works for your usecase?

Cheers,
Kostas

> On Aug 21, 2018, at 1:58 PM, Artsem Semianenka <ar...@gmail.com> wrote:
> 
> Thanks for reply, Till !
> 
> Buy the way, If Flink going to support compatibility with Hadoop 2.6 I don't see another way how to achieve it. 
> As I mention before one of popular distributive Cloudera still based on Hadoop 2.6 and it very sad if Flink unsupport it.
> I really want to help Flink comunity to support this legacy. But currently I see only one way to acheve it by emulate 'truncate' logic and recreate new file with needed lenght and replace old .
> 
> Cheers,
> Artsem
> 
> On Tue, 21 Aug 2018 at 14:41, Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
> Hi Artsem,
> 
> if I recall correctly, then we explicitly decided to not support the valid
> file length files with the new StreamingFileSink because they are really
> hard to handle for the user. I've pulled Klou into this conversation who is
> more knowledgeable and can give you a bit more advice.
> 
> Cheers,
> Till
> 
> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka <artfulonline@gmail.com <ma...@gmail.com>>
> wrote:
> 
> > I have an idea to create new version of HadoopRecoverableFsDataOutputStream
> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
> > which will works with valid-length files without invoking truncate. And
> > modify check in HadoopRecoverableWriter to use
> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> > lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> > am on the right way.
> >
> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <artfulonline@gmail.com <ma...@gmail.com>>
> > wrote:
> >
> > > Hi guys !
> > > I have a question regarding new StreamingFileSink (introduced in 1.6
> > > version) . We use this sink to write data into Parquet format. But I
> > faced
> > > with issue when trying to run job on Yarn cluster and save result to
> > HDFS.
> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
> > contains
> > > HDFS 2.6.0  . This version is not support truncate method . I would like
> > to
> > > create Pull request but I want to ask your advice how better design this
> > > fix and which ideas are behind this decision . I saw similiar PR for
> > > BucketingSink https://github.com/apache/flink/pull/6108 <https://github.com/apache/flink/pull/6108> . Maybe I could
> > > also add support of valid-length files for older Hadoop versions ?
> > >
> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> > >
> > > Best regards,
> > > Artsem
> > >
> >
> >
> > --
> >
> > С уважением,
> > Артем Семененко
> >
> 
> 
> -- 
> С уважением,
> Артем Семененко
> 


Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Artsem Semianenka <ar...@gmail.com>.
Thanks for reply, Till !

Buy the way, If Flink going to support compatibility with Hadoop 2.6 I
don't see another way how to achieve it.
As I mention before one of popular distributive Cloudera still based on
Hadoop 2.6 and it very sad if Flink unsupport it.
I really want to help Flink comunity to support this legacy. But currently
I see only one way to acheve it by emulate 'truncate' logic and recreate
new file with needed lenght and replace old .

Cheers,
Artsem

On Tue, 21 Aug 2018 at 14:41, Till Rohrmann <tr...@apache.org> wrote:

> Hi Artsem,
>
> if I recall correctly, then we explicitly decided to not support the valid
> file length files with the new StreamingFileSink because they are really
> hard to handle for the user. I've pulled Klou into this conversation who is
> more knowledgeable and can give you a bit more advice.
>
> Cheers,
> Till
>
> On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka <ar...@gmail.com>
> wrote:
>
> > I have an idea to create new version of
> HadoopRecoverableFsDataOutputStream
> > class (for example with name LegacyHadoopRecoverableFsDataOutputStream
> :) )
> > which will works with valid-length files without invoking truncate. And
> > modify check in HadoopRecoverableWriter to use
> > LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> > lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> > am on the right way.
> >
> > On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <ar...@gmail.com>
> > wrote:
> >
> > > Hi guys !
> > > I have a question regarding new StreamingFileSink (introduced in 1.6
> > > version) . We use this sink to write data into Parquet format. But I
> > faced
> > > with issue when trying to run job on Yarn cluster and save result to
> > HDFS.
> > > In our case we use latest Cloudera distributive (CHD 5.15) and it
> > contains
> > > HDFS 2.6.0  . This version is not support truncate method . I would
> like
> > to
> > > create Pull request but I want to ask your advice how better design
> this
> > > fix and which ideas are behind this decision . I saw similiar PR for
> > > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I
> could
> > > also add support of valid-length files for older Hadoop versions ?
> > >
> > > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> > >
> > > Best regards,
> > > Artsem
> > >
> >
> >
> > --
> >
> > С уважением,
> > Артем Семененко
> >
>


-- 

С уважением,
Артем Семененко

Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Artsem,

if I recall correctly, then we explicitly decided to not support the valid
file length files with the new StreamingFileSink because they are really
hard to handle for the user. I've pulled Klou into this conversation who is
more knowledgeable and can give you a bit more advice.

Cheers,
Till

On Mon, Aug 20, 2018 at 2:53 PM Artsem Semianenka <ar...@gmail.com>
wrote:

> I have an idea to create new version of HadoopRecoverableFsDataOutputStream
> class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
> which will works with valid-length files without invoking truncate. And
> modify check in HadoopRecoverableWriter to use
> LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
> lower then 2.7 . I will try to provide PR soon if no objections. I hope I
> am on the right way.
>
> On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <ar...@gmail.com>
> wrote:
>
> > Hi guys !
> > I have a question regarding new StreamingFileSink (introduced in 1.6
> > version) . We use this sink to write data into Parquet format. But I
> faced
> > with issue when trying to run job on Yarn cluster and save result to
> HDFS.
> > In our case we use latest Cloudera distributive (CHD 5.15) and it
> contains
> > HDFS 2.6.0  . This version is not support truncate method . I would like
> to
> > create Pull request but I want to ask your advice how better design this
> > fix and which ideas are behind this decision . I saw similiar PR for
> > BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I could
> > also add support of valid-length files for older Hadoop versions ?
> >
> > P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> > Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
> >
> > Best regards,
> > Artsem
> >
>
>
> --
>
> С уважением,
> Артем Семененко
>

Re: Support Hadoop 2.6 for StreamingFileSink

Posted by Artsem Semianenka <ar...@gmail.com>.
I have an idea to create new version of HadoopRecoverableFsDataOutputStream
class (for example with name LegacyHadoopRecoverableFsDataOutputStream :) )
which will works with valid-length files without invoking truncate. And
modify check in HadoopRecoverableWriter to use
LegacyHadoopRecoverableFsDataOutputStream in case if Hadoop version is
lower then 2.7 . I will try to provide PR soon if no objections. I hope I
am on the right way.

On Mon, 20 Aug 2018 at 14:40, Artsem Semianenka <ar...@gmail.com>
wrote:

> Hi guys !
> I have a question regarding new StreamingFileSink (introduced in 1.6
> version) . We use this sink to write data into Parquet format. But I faced
> with issue when trying to run job on Yarn cluster and save result to HDFS.
> In our case we use latest Cloudera distributive (CHD 5.15) and it contains
> HDFS 2.6.0  . This version is not support truncate method . I would like to
> create Pull request but I want to ask your advice how better design this
> fix and which ideas are behind this decision . I saw similiar PR for
> BucketingSink https://github.com/apache/flink/pull/6108 . Maybe I could
> also add support of valid-length files for older Hadoop versions ?
>
> P.S.Unfortently CHD 5.15 (with Hadoop 2.6) is the latest version of
> Cloudera distributive and we can't upgrade hadoop to 2.7 Hadoop .
>
> Best regards,
> Artsem
>


-- 

С уважением,
Артем Семененко