You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Daniël Heres (Jira)" <ji...@apache.org> on 2021/01/01 12:50:00 UTC

[jira] [Commented] (ARROW-11030) [Rust] [DataFusion] HashJoinExec slow with many batches

    [ https://issues.apache.org/jira/browse/ARROW-11030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17257187#comment-17257187 ] 

Daniël Heres commented on ARROW-11030:
--------------------------------------

Added a PR here, still with MutableArrayData (so some optimization opportunities left) but with batches merged to one batch [https://github.com/apache/arrow/pull/9070. 

|https://github.com/apache/arrow/pull/9070]Collecting to one batch seems very promising, and removes a large amount of the remaining overhead when using smaller batches.

> [Rust] [DataFusion] HashJoinExec slow with many batches
> -------------------------------------------------------
>
>                 Key: ARROW-11030
>                 URL: https://issues.apache.org/jira/browse/ARROW-11030
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust - DataFusion
>            Reporter: Andy Grove
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.0.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Performance of joins slows down dramatically with smaller batches.
> The issue is related to slow performance of MutableDataArray::new() when passed a high number of batches. This happens when passing in all of the batches from the build side of the join and this happens once per build-side join key for each probe-side batch.
> It seems to get exponentially slower as the number of arrays increases even though the number of rows is the same.
> I modified hash_join.rs to have this debug code:
> {code:java}
> let start = Instant::now();
> let row_count: usize = arrays.iter().map(|arr| arr.len()).sum();
> let num_arrays = arrays.len();
> let mut mutable = MutableArrayData::new(arrays, true, capacity);
> if num_arrays > 0 {
>     debug!("MutableArrayData::new() with {} arrays containing {} rows took {} ms", num_arrays, row_count, start.elapsed().as_millis());
> } {code}
> Batch size 131072:
> {code:java}
> MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms
> MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms
> MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms {code}
> Batch size 16384:
> {code:java}
> MutableArrayData::new() with 36624 arrays containing 3115341 rows took 19 ms
> MutableArrayData::new() with 36624 arrays containing 3115341 rows took 16 ms
> MutableArrayData::new() with 36624 arrays containing 3115341 rows took 17 ms {code}
> Batch size 4096:
> {code:java}
> MutableArrayData::new() with 146496 arrays containing 3115341 rows took 88 ms
> MutableArrayData::new() with 146496 arrays containing 3115341 rows took 89 ms
> MutableArrayData::new() with 146496 arrays containing 3115341 rows took 88 ms {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)