You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Vinoth Chandar <vi...@apache.org> on 2020/03/02 18:16:04 UTC

Re: Re: Re: [DISCUSS] Improve the merge performance for cow

Hi Lamber-ken,

If you agree reduceByKey() will shuffle data, then it would serialize and
deserialize anyway correct?

I am not denying that this may be a valid approach.. But we need much more
rigorous testing and potentially implement both approaches side-by-side to
compare.. IMO We cannot conclude based on this on the one test we had -
where the metadata overhead was so high . First step would be to introduce
abstractions so that these two ways can be implemented side-by-side and
controlled by a flag..

Also let's separate the RDD vs DataFrame discussion out of this? Since that
orthogonal anyway..

Thanks
Vinoth


On Fri, Feb 28, 2020 at 11:02 AM lamberken <la...@163.com> wrote:

>
>
> Hi vinoth,
>
>
> Thanks for reviewing the initial design :)
> I know there are many problems at present(e.g shuffling, parallelism
> issue). We can discussed the practicability of the idea first.
>
>
> > ExternalSpillableMap itself was not the issue right, the serialization
> was
> Right, the new design will not have this issue, because will not use it at
> all.
>
>
> > This map is also used on the query side
> Right, the proposal aims to improve the merge performance of cow table.
>
>
> > HoodieWriteClient.java#L546 We cannot collect() the recordRDD at all ...
> OOM driver
> Here, in order to get the Map<fileId, partition>, had executed distinct()
> before collect(), the result is very small.
> Also, it can be implemented in FileSystemViewManager, and lazy loading
> also ok.
>
>
> > Doesn't this move the problem to tuning spark simply?
> there are two serious performance problems in the old merge logic.
> 1, when upsert many records, it will serialize record to disk, then
> deserialize it when merge old record
> 2, only single thread comsume the old record one by one, then handle the
> merge process, it is much less efficient.
>
>
> > doing a sort based merge repartitionAndSortWithinPartitions
> Trying to understand your point :)
>
>
> Compare to old version, may there are serveral improvements
> 1. use spark built-in operators, it's easier to understand.
> 2. during my testing, the upsert performance doubled.
> 3. if possible, we can write data in batch by using Dataframe in the
> futher.
>
>
> [1]
> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
>
>
> Best,
> Lamber-Ken
>
>
>
>
>
>
>
>
>
> At 2020-02-29 01:40:36, "Vinoth Chandar" <vi...@apache.org> wrote:
> >Does n't this move the problem to tuning spark simply? the
> >ExternalSpillableMap itself was not the issue right, the serialization
> >was.  This map is also used on the query side btw, where we need something
> >like that.
> >
> >I took a pass at the code. I think we are shuffling data again for the
> >reduceByKey step in this approach? For MOR, note that this is unnecessary
> >since we simply log the. records and there is no merge. This approach
> might
> >have a better parallelism of merging when that's costly.. But ultimately,
> >our write parallelism is limited by number of affected files right?  So
> its
> >not clear to me, that this would be a win always..
> >
> >On the code itself,
> >
> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L546
> > We cannot collect() the recordRDD at all.. It will OOM the driver .. :)
> >
> >Orthogonally, one thing we think of is : doing a sort based merge.. i.e
> >repartitionAndSortWithinPartitions()  the input records to mergehandle,
> and
> >if the file is also sorted on disk (its not today), then we can do a
> >merge_sort like algorithm to perform the merge.. We can probably write
> code
> >to bear one time sorting costs... This will eliminate the need for memory
> >for merging altogether..
> >
> >On Wed, Feb 26, 2020 at 10:11 PM lamberken <la...@163.com> wrote:
> >
> >>
> >>
> >> hi, vinoth
> >>
> >>
> >> > What do you mean by spark built in operators
> >> We may can not depency on ExternalSpillableMap again when upsert to cow
> >> table.
> >>
> >>
> >> > Are you suggesting that we perform the merging in sql
> >> No, just only use spark built-in operators like mapToPair, reduceByKey
> etc
> >>
> >>
> >> Details has been described in this article[1], also finished draft
> >> implementation and test.
> >> mainly modified HoodieWriteClient#upsertRecordsInternal method.
> >>
> >>
> >> [1]
> >>
> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
> >> [2]
> >>
> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
> >>
> >>
> >>
> >> At 2020-02-27 13:45:57, "Vinoth Chandar" <vi...@apache.org> wrote:
> >> >Hi lamber-ken,
> >> >
> >> >Thanks for this. I am not quite following the proposal. What do you
> mean
> >> by
> >> >spark built in operators? Dont we use the RDD based spark operations.
> >> >
> >> >Are you suggesting that we perform the merging in sql? Not following.
> >> >Please clarify.
> >> >
> >> >On Wed, Feb 26, 2020 at 10:08 AM lamberken <la...@163.com> wrote:
> >> >
> >> >>
> >> >>
> >> >> Hi guys,
> >> >>
> >> >>
> >> >> Motivation
> >> >> Impove the merge performance for cow table when upsert, handle merge
> >> >> operation by using spark built-in operators.
> >> >>
> >> >>
> >> >> Background
> >> >> When do a upsert operation, for each bucket, hudi needs to put new
> input
> >> >> elements to memory cache map, and will
> >> >> need an external map that spills content to disk when there is
> >> >> insufficient space for it to grow.
> >> >>
> >> >>
> >> >> There are several performance issuses:
> >> >> 1. We may need an external disk map, serialize / deserialize records
> >> >> 2. Only single thread do the I/O operation when check
> >> >> 3. Can't take advantage of built-in spark operators
> >> >>
> >> >>
> >> >> Based on above, reworked the merge logic and done draft test.
> >> >> If you are also interested in this, please go ahead with this doc[1],
> >> any
> >> >> suggestion are welcome. :)
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> Thanks,
> >> >> Lamber-Ken
> >> >>
> >> >>
> >> >> [1]
> >> >>
> >>
> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
> >> >>
> >> >>
> >>
>

Re:Re: Re: Re: [DISCUSS] Improve the merge performance for cow

Posted by lamberken <la...@163.com>.

Hi Vinoth,


Yes, it's incorrect to draw the conclusion from only one test.


It's just an new idea to improve the merge performance, it's not the best.
e.g when read old record, series of conversion operations (Row to GenericRecord to HoodieRecord) etc..


> Also let's separate the RDD vs DataFrame discussion out of this
Okay, mentioned it here, because if use Dataset/DataFrame, may not need so many conversions in hudi project.
IMO, the new merge program will be more clearer, it's a great project to do that.
As you suggested, let's separate it out of this.


Best,
Lamber-Ken





At 2020-03-03 02:16:04, "Vinoth Chandar" <vi...@apache.org> wrote:
>Hi Lamber-ken,
>
>If you agree reduceByKey() will shuffle data, then it would serialize and
>deserialize anyway correct?
>
>I am not denying that this may be a valid approach.. But we need much more
>rigorous testing and potentially implement both approaches side-by-side to
>compare.. IMO We cannot conclude based on this on the one test we had -
>where the metadata overhead was so high . First step would be to introduce
>abstractions so that these two ways can be implemented side-by-side and
>controlled by a flag..
>
>Also let's separate the RDD vs DataFrame discussion out of this? Since that
>orthogonal anyway..
>
>Thanks
>Vinoth
>
>
>On Fri, Feb 28, 2020 at 11:02 AM lamberken <la...@163.com> wrote:
>
>>
>>
>> Hi vinoth,
>>
>>
>> Thanks for reviewing the initial design :)
>> I know there are many problems at present(e.g shuffling, parallelism
>> issue). We can discussed the practicability of the idea first.
>>
>>
>> > ExternalSpillableMap itself was not the issue right, the serialization
>> was
>> Right, the new design will not have this issue, because will not use it at
>> all.
>>
>>
>> > This map is also used on the query side
>> Right, the proposal aims to improve the merge performance of cow table.
>>
>>
>> > HoodieWriteClient.java#L546 We cannot collect() the recordRDD at all ...
>> OOM driver
>> Here, in order to get the Map<fileId, partition>, had executed distinct()
>> before collect(), the result is very small.
>> Also, it can be implemented in FileSystemViewManager, and lazy loading
>> also ok.
>>
>>
>> > Doesn't this move the problem to tuning spark simply?
>> there are two serious performance problems in the old merge logic.
>> 1, when upsert many records, it will serialize record to disk, then
>> deserialize it when merge old record
>> 2, only single thread comsume the old record one by one, then handle the
>> merge process, it is much less efficient.
>>
>>
>> > doing a sort based merge repartitionAndSortWithinPartitions
>> Trying to understand your point :)
>>
>>
>> Compare to old version, may there are serveral improvements
>> 1. use spark built-in operators, it's easier to understand.
>> 2. during my testing, the upsert performance doubled.
>> 3. if possible, we can write data in batch by using Dataframe in the
>> futher.
>>
>>
>> [1]
>> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
>>
>>
>> Best,
>> Lamber-Ken
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-02-29 01:40:36, "Vinoth Chandar" <vi...@apache.org> wrote:
>> >Does n't this move the problem to tuning spark simply? the
>> >ExternalSpillableMap itself was not the issue right, the serialization
>> >was.  This map is also used on the query side btw, where we need something
>> >like that.
>> >
>> >I took a pass at the code. I think we are shuffling data again for the
>> >reduceByKey step in this approach? For MOR, note that this is unnecessary
>> >since we simply log the. records and there is no merge. This approach
>> might
>> >have a better parallelism of merging when that's costly.. But ultimately,
>> >our write parallelism is limited by number of affected files right?  So
>> its
>> >not clear to me, that this would be a win always..
>> >
>> >On the code itself,
>> >
>> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L546
>> > We cannot collect() the recordRDD at all.. It will OOM the driver .. :)
>> >
>> >Orthogonally, one thing we think of is : doing a sort based merge.. i.e
>> >repartitionAndSortWithinPartitions()  the input records to mergehandle,
>> and
>> >if the file is also sorted on disk (its not today), then we can do a
>> >merge_sort like algorithm to perform the merge.. We can probably write
>> code
>> >to bear one time sorting costs... This will eliminate the need for memory
>> >for merging altogether..
>> >
>> >On Wed, Feb 26, 2020 at 10:11 PM lamberken <la...@163.com> wrote:
>> >
>> >>
>> >>
>> >> hi, vinoth
>> >>
>> >>
>> >> > What do you mean by spark built in operators
>> >> We may can not depency on ExternalSpillableMap again when upsert to cow
>> >> table.
>> >>
>> >>
>> >> > Are you suggesting that we perform the merging in sql
>> >> No, just only use spark built-in operators like mapToPair, reduceByKey
>> etc
>> >>
>> >>
>> >> Details has been described in this article[1], also finished draft
>> >> implementation and test.
>> >> mainly modified HoodieWriteClient#upsertRecordsInternal method.
>> >>
>> >>
>> >> [1]
>> >>
>> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
>> >> [2]
>> >>
>> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
>> >>
>> >>
>> >>
>> >> At 2020-02-27 13:45:57, "Vinoth Chandar" <vi...@apache.org> wrote:
>> >> >Hi lamber-ken,
>> >> >
>> >> >Thanks for this. I am not quite following the proposal. What do you
>> mean
>> >> by
>> >> >spark built in operators? Dont we use the RDD based spark operations.
>> >> >
>> >> >Are you suggesting that we perform the merging in sql? Not following.
>> >> >Please clarify.
>> >> >
>> >> >On Wed, Feb 26, 2020 at 10:08 AM lamberken <la...@163.com> wrote:
>> >> >
>> >> >>
>> >> >>
>> >> >> Hi guys,
>> >> >>
>> >> >>
>> >> >> Motivation
>> >> >> Impove the merge performance for cow table when upsert, handle merge
>> >> >> operation by using spark built-in operators.
>> >> >>
>> >> >>
>> >> >> Background
>> >> >> When do a upsert operation, for each bucket, hudi needs to put new
>> input
>> >> >> elements to memory cache map, and will
>> >> >> need an external map that spills content to disk when there is
>> >> >> insufficient space for it to grow.
>> >> >>
>> >> >>
>> >> >> There are several performance issuses:
>> >> >> 1. We may need an external disk map, serialize / deserialize records
>> >> >> 2. Only single thread do the I/O operation when check
>> >> >> 3. Can't take advantage of built-in spark operators
>> >> >>
>> >> >>
>> >> >> Based on above, reworked the merge logic and done draft test.
>> >> >> If you are also interested in this, please go ahead with this doc[1],
>> >> any
>> >> >> suggestion are welcome. :)
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> Thanks,
>> >> >> Lamber-Ken
>> >> >>
>> >> >>
>> >> >> [1]
>> >> >>
>> >>
>> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
>> >> >>
>> >> >>
>> >>
>>