You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/28 22:18:11 UTC

[GitHub] [arrow] andygrove opened a new pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

andygrove opened a new pull request #9036:
URL: https://github.com/apache/arrow/pull/9036


   I am investigating why join performance is so bad with smaller batch sizes (see https://issues.apache.org/jira/browse/ARROW-11030)  and this is one optimization that I have found so far that helps a bit.
   
   Prior to this PR, we use the size of left or right batches to guess the capacity of output batches and this results in a lot of over allocation in some cases. For TPC-H q12 at SF=100, I see vectors created with capacity of ~3,000,000 (the size of the build-side of the join) and then we only populate it with ~700 entries.
   
   This PR attempts to learn a good capacity based on previously processed batches.
   
   Here are query times in seconds at different batch sizes:
   
   Batch Size | Master | This PR
   -- | -- | --
   16384 | 189.6 | 158.0
   32768 | 61.9 | 47.2
   65536 | 28.2 | 21.4
   131072 | 19.0 | 15.6
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752060487


   An important source of slowness seems to be in the (use and inefficiency of) creating the `MutableArrayData` structure. In profiling I see a lot in `build_extend`, `freeze` etc. 
   
   Changing the piece of code to generate a `Vec<&ArrayData>` directly gives a ~10-20% speedup locally on batches of size 1000 on your branch @andygrove :
   ```rust
           let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
               Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
               Err(_) => {
                   match secondary[0].schema().index_of(field.name()) {
                       Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
                       _ => Err(DataFusionError::Internal(
                           format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
                       ))
                   }
               }
           }.map_err(DataFusionError::into_arrow_external_error)?;
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752122091


   I think part of a further speed up could be moving the building of the left / build-side `Vec<&ArrayData>` so that it is only created once instead of for each right batch. Currently when making the batch size smaller, the build-side Vec is built more times, but also contains more (smaller) batches itself, which could explain (part of the) big slowdown on smaller batches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752060487


   An important source of slowness seems to be in the (use and inefficiency of) creating the `MutableArrayData` structure. In profiling I see a lot in `build_extend`, `freeze` etc. 
   
   Changing the piece of code to generate a `Vec<&ArrayData>` directly gives a ~10% speedup locally on batches of size 1000 on your branch @andygrove :
   ```rust
           let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
               Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
               Err(_) => {
                   match secondary[0].schema().index_of(field.name()) {
                       Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
                       _ => Err(DataFusionError::Internal(
                           format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
                       ))
                   }
               }
           }.map_err(DataFusionError::into_arrow_external_error)?;
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752675210


   @andygrove thought more about this, I think we are able to use `indices.len()` for the *exact* required capacity rather than using previous sizes. I included the change among other changes in this PR https://github.com/apache/arrow/pull/9048


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752250772


   I think this is something we should merge. We can maybe tweak the extra `1024` items but this is one extra source of slowness for smaller batches. Also I think helps making the other sources more clear.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#discussion_r549556756



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -531,14 +561,45 @@ impl Stream for HashJoinStream {
         self.right
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
-                Some(Ok(batch)) => Some(build_batch(
-                    &batch,
-                    &self.left_data,
-                    &self.on_right,
-                    &self.join_type,
-                    &self.schema,
-                )),
-                other => other,
+                Some(Ok(batch)) => {
+                    let start = Instant::now();
+                    let capacity = if self.num_output_batches == 0 {

Review comment:
       In https://github.com/apache/arrow/pull/9032 it was benchmarked to be a bit faster currently to initialize with capacity 0 than to use the correct capacity upfront. I think that is more something curious and hopefully will change, but that might be the case here too? What is the performance if we initialize with 0 everywhere?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752122091


   I think part of a further speed up could be moving the building of the left / build-side `Vec<&ArrayData>` arrays so that it is only created once instead of for each right batch in `build_batch_from_indices`. Currently when making the batch size smaller, the build-side Vec is built more times, but also contains more (smaller) batches itself, which could explain (part of the) big / exponential slowdown on smaller batches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752060487


   An important source of slowness seems to be in the (use and inefficiency of) creating the `ArrayData` and MutableArrayData` structure. In profiling I see a lot in `build_extend`, `freeze` etc. 
   
   Changing the piece of code to generate a `Vec<&ArrayData>` directly gives a ~10-20% speedup locally on batches of size 1000 on your branch @andygrove :
   ```rust
           let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
               Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
               Err(_) => {
                   match secondary[0].schema().index_of(field.name()) {
                       Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
                       _ => Err(DataFusionError::Internal(
                           format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
                       ))
                   }
               }
           }.map_err(DataFusionError::into_arrow_external_error)?;
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #9036:
URL: https://github.com/apache/arrow/pull/9036


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752122091


   I think part of a further speed up could be moving the building of the left / build-side `Vec<&ArrayData>` so that it is only created once instead of for each right batch in `build_batch_from_indices`. Currently when making the batch size smaller, the build-side Vec is built more times, but also contains more (smaller) batches itself, which could explain (part of the) big / exponential slowdown on smaller batches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752060487


   An important source of slowness seems to be in the (use and inefficiency of) creating the `ArrayData` and `MutableArrayData` structure. In profiling I see a lot in `build_extend`, `freeze` etc. 
   
   Changing the piece of code to generate a `Vec<&ArrayData>` directly gives a ~20% speedup locally on batches of size 1000 on your branch @andygrove :
   ```rust
           let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
               Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
               Err(_) => {
                   match secondary[0].schema().index_of(field.name()) {
                       Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
                       _ => Err(DataFusionError::Internal(
                           format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
                       ))
                   }
               }
           }.map_err(DataFusionError::into_arrow_external_error)?;
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752250772


   I think this is something we should merge. We can maybe tweak the extra `1024` capacity but this is one extra source of slowness for smaller batches. Also I think helps making the other sources more clear.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-751884417


   https://issues.apache.org/jira/browse/ARROW-11053


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752122091


   I think part of a further speed up could be moving the building of the left / build-side `Vec<&ArrayData>` so that it is only created once instead of for each right batch. Currently when making the batch size smaller, the build-side Vec is built more times, but also contains more (smaller) batches itself, which could explain (part of the) big / exponential slowdown on smaller batches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752685504


   Closed in favor of https://github.com/apache/arrow/pull/9048


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752060487


   An important source of slowness seems to be in the (use and inefficiency of) creating the `ArrayData` and `MutableArrayData` structure. In profiling I see a lot in `build_extend`, `freeze` etc. 
   
   Changing the piece of code to generate a `Vec<&ArrayData>` directly gives a ~10-20% speedup locally on batches of size 1000 on your branch @andygrove :
   ```rust
           let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
               Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
               Err(_) => {
                   match secondary[0].schema().index_of(field.name()) {
                       Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
                       _ => Err(DataFusionError::Internal(
                           format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
                       ))
                   }
               }
           }.map_err(DataFusionError::into_arrow_external_error)?;
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9036: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9036:
URL: https://github.com/apache/arrow/pull/9036#issuecomment-752067336


   Opened a PR with that change here: https://github.com/apache/arrow/pull/9042


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org