You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/29 20:31:29 UTC

[GitHub] [arrow] alamb opened a new pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

alamb opened a new pull request #8553:
URL: https://github.com/apache/arrow/pull/8553


   This PR is based on https://github.com/apache/arrow/pull/8503 from jorgecarlieto
   
   Note that this does not add any new dependencies (pin_project_lite is already required indirectly via tokio)
   
   
   
   
   Aggregate Pperformance, as measured with the following command, reports improvement between 5 and 15 percent on my laptop, though the variance is quite high.
   ```
   git checkout master && \
       cargo bench --bench aggregate_query_sql && \
       git checkout alamb/ARROW-10366-collect-in-parallel && \
       cargo bench --bench aggregate_query_sql
   ```
   
   # A collapsible section with markdown
   <details>
     <summary>Click for performance details</summary>
   
   
   aggregate_query_no_group_by 15 12
                           time:   [715.38 us 717.64 us 720.07 us]
                           change: [-5.6870% -4.7352% -3.6834%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 8 outliers among 100 measurements (8.00%)
     2 (2.00%) high mild
     6 (6.00%) high severe
   
   Benchmarking aggregate_query_group_by 15 12: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 13.7s or reduce sample count to 40.
   aggregate_query_group_by 15 12
                           time:   [2.4874 ms 2.5029 ms 2.5173 ms]
                           change: [-13.513% -12.381% -11.184%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 5 outliers among 100 measurements (5.00%)
     3 (3.00%) high mild
     2 (2.00%) high severe
   
   Benchmarking aggregate_query_group_by_with_filter 15 12: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 10.2s or reduce sample count to 40.
   aggregate_query_group_by_with_filter 15 12
                           time:   [1.9724 ms 1.9856 ms 1.9982 ms]
                           change: [-14.759% -10.342% -6.5522%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 5 outliers among 100 measurements (5.00%)
     2 (2.00%) high mild
     3 (3.00%) high severe
   
   
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-723025807


   Ah -- got it. 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#discussion_r514551317



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -214,13 +217,13 @@ Example: average
 * Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns.
 * Finally, `get_value` returns an array with one entry computed from the state
 */
-struct GroupedHashAggregateStream {
-    mode: AggregateMode,
-    schema: SchemaRef,
-    group_expr: Vec<Arc<dyn PhysicalExpr>>,
-    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: SendableRecordBatchStream,
-    finished: bool,
+pin_project! {

Review comment:
       I found the whole notion of having to Pin things to call `poll_next` on streams to be unnaturally confusing. While I basically understand what `pin_project!` does, I will admit I do not understand why Streams need to be `Pin`. I am just cargo culting it there

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -315,6 +318,41 @@ fn group_aggregate_batch(
     Ok(accumulators)
 }
 
+async fn compute_grouped_hash_aggregate(

Review comment:
       The main change in this file is to pull the computation of the output record batch into a future (aka an async function). I think this formulation is still a natural way to express the logic (you can see each input batch being fed to `group_aggregate_batch`)

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -324,12 +362,24 @@ impl GroupedHashAggregateStream {
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         input: SendableRecordBatchStream,
     ) -> Self {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        let schema_clone = schema.clone();
+        tokio::spawn(async move {

Review comment:
       this spawns a separate task to do the actual aggregation (and drive the inputs, as needed, by calling `input.next().await`). Driving the input was not happening when using `future.into_stream` in the actual call to `poll_next` https://github.com/apache/arrow/pull/8553/files#diff-4dbe750a2836f6c723bc36ae39f2faf15e2706303f5dbb042ae00f1f36658c62L399




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-720409056






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] rdettai commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
rdettai commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-726895657


   I had some code that was crashing because of the behavior aggregate had when the wrapped exec first returned `Pending` when being polled. It know works perfectly with this PR! Thanks Andrew! Once this is merged I have some tests ready in #8658 that validate this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-719162254


   I will try and run some TPC benchmarks tomorrow. I'm excited to see how this looks. Great work @jorgecarleitao and @alamb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-725024351


   Rebased


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-726900488


   I plan to merge this tomorrow unless i hear otherwise. @jorgecarleitao  / @andygrove  let me know if you have any concerns


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb edited a comment on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-722690860






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-727192549


   Rebased


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-722694100


   Thanks @alamb for posting the detailed info. I hope I haven't caused confusion with my earlier comments and performance data. I was not seeing a slowdown on this PR compared to master. I was seeing a slowdown in master compared to the 2.0.0 release (which I tracked down to the commit that removed specialization).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-719004218


   https://issues.apache.org/jira/browse/ARROW-10366


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-727201907


   FYI @rdettai , this PR has been merged


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-722690860


   @jorgecarleitao  and @andygrove  -- I did some profiling of the tpch benchmark (SF10) locally on my laptop this afternoon. I was not able to reproduce a slowdown on this branch
   
   Here is my data from on my laptop (a Macbook Pro):
   
   Test command was: `cargo run --release --bin tpch -- --iterations 3 --path /Users/alamb/Software/tpch_data/SF10-parquet-64 --format parquet --query 1 --batch-size 4096`
   
   When run against  @ d4121d8a17d9e53ad4421960e357dd2f89771603
   
   ```
   Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 2, batch_size: 4096, path: "/Users/alamb/Software/tpch_data/SF10-parquet-64", file_format: "parquet", mem_table: false }
   Query 1 iteration 0 took 8913 ms
   Query 1 iteration 1 took 10533 ms
   Query 1 iteration 2 took 10633 ms
   ```
   
   When run against this branch @ 802868617777022bd0945f98f987993ca7267eb6 (after rebasing against d4121d8a17d9e53ad4421960e357dd2f89771603):
   
   ```
   Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 2, batch_size: 4096, path: "/Users/alamb/Software/tpch_data/SF10-parquet-64", file_format: "parquet", mem_table: false }
   Query 1 iteration 0 took 9168 ms
   Query 1 iteration 1 took 10097 ms
   Query 1 iteration 2 took 10199 ms
   ```
   
   So in other words I didn't measure a noticeable change on my machine
   
   Qualatatively both this branch and master While kept my CPUs ~ 100% busy and the memory used was fairly consistent
   
   I then spent some time poking around with a profile from XCode Instruments to see where the time was going. Unsurprisingly it is going to memory management (this is the story of my professional life):
   
   ![Screen Shot 2020-11-05 at 5 39 03 PM](https://user-images.githubusercontent.com/490673/98304485-0e42ad80-1f8e-11eb-8927-d0f0c5edae44.png)
   
   I am going to poke some more time seeing if I can figure out any easy to recycle buffers or find a smoking gun


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #8553:
URL: https://github.com/apache/arrow/pull/8553


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-724935395


   @jorgecarleitao -- when I ran the TPCH benchmark Q1 locally on my machine, I found it kept all my cores busy and the memory profile was low. Thus the improvements offered by this PR of running more things in parallel and avoiding buffering don't address the (current) bottleneck in the TPCH query
   
   I theorize (without presenting evidence yet) that this is due to how the query is running and that:
   1. Runtime is  dominated by filtering (not aggregation)
   2. The intermediate results are small (the output of the first stage of grouping is 4 rows)
   
   I have a more detailed performance analysis half written up but it is not coherently enough to share yet . I am hoping for some time later this week to write it up
   
   I (selfishly) think this is an improvement, but I don't have any performance numbers to back that up


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#issuecomment-727201606


   The [travis CI](https://github.com/apache/arrow/pull/8553/checks?check_run_id=1399712911) checks (which aren't for rust specific things) seem to be stuck / not working. I don't think they are related to this PR (e.g. the same thing happened on [This unrelated PR](https://github.com/apache/arrow/pull/8662/checks?check_run_id=1399439929) ), so merging


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8553: ARROW-10366: [Rust][DataFusion] Do not buffer intermediate results in merge or HashAggregate

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8553:
URL: https://github.com/apache/arrow/pull/8553#discussion_r514549229



##########
File path: rust/datafusion/Cargo.toml
##########
@@ -56,6 +56,7 @@ num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
 futures = "0.3"
+pin-project-lite= "^0.1.1"

Review comment:
       Note that this does not add any new actual dependencies (pin_project_lite is already required indirectly via tokio)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org