You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/16 14:10:30 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

jorgecarleitao opened a new pull request #8480:
URL: https://github.com/apache/arrow/pull/8480


   This PR is a proposal to change our iterators over `RecordBatch` to `Future<RecordBatch>`, thereby making our compute operations over a single batch as the "unit of work".
   
   The rational here is that our expensive operations are over record batches, which are the ones that benefit from being split in smaller units (and multi-threaded).
   
   This PR also places some `tokio::spawn` on some ops, with the expectation that the scheduler can multi-thread them.
   
   The micro-benchmarks are not very indicative as this affects larger sizes, but as a rough idea, I get -50% improvement for larger math projections and +5% to +20% degradation for aggregations.
   
   The aggregations have a mutex blocking the whole thing, which may explain the result.
   
   <details>
   <summary>Benchmarks</summary>
   
   Math
   ```
   sqrt_20_9               time:   [7.6861 ms 7.7328 ms 7.7805 ms]                      
                           change: [+20.288% +21.552% +22.732%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 4 outliers among 100 measurements (4.00%)
     4 (4.00%) high mild
   
   Benchmarking sqrt_20_12: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.4s, enable flat sampling, or reduce sample count to 50.
   sqrt_20_12              time:   [1.7891 ms 1.7954 ms 1.8020 ms]                        
                           change: [-38.768% -37.852% -36.578%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 10 outliers among 100 measurements (10.00%)
     6 (6.00%) high mild
     4 (4.00%) high severe
   
   sqrt_22_12              time:   [8.4315 ms 8.5324 ms 8.6348 ms]                       
                           change: [-39.677% -38.299% -36.893%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   
   sqrt_22_14              time:   [11.515 ms 11.759 ms 12.054 ms]                       
                           change: [-48.307% -47.200% -45.854%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 10 outliers among 100 measurements (10.00%)
     4 (4.00%) high mild
     6 (6.00%) high severe
   ```
   
   Aggregates:
   ```
   aggregate_query_no_group_by 15 12                                                                            
                           time:   [831.70 us 836.65 us 842.22 us]
                           change: [+8.0888% +9.9290% +11.904%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 10 outliers among 100 measurements (10.00%)
     1 (1.00%) low mild
     2 (2.00%) high mild
     7 (7.00%) high severe
   
   aggregate_query_group_by 15 12                                                                            
                           time:   [5.9246 ms 5.9763 ms 6.0367 ms]
                           change: [+3.1496% +4.1417% +5.1472%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 13 outliers among 100 measurements (13.00%)
     6 (6.00%) high mild
     7 (7.00%) high severe
   
   aggregate_query_group_by_with_filter 15 12                                                                             
                           time:   [3.4054 ms 3.4322 ms 3.4597 ms]
                           change: [+26.844% +27.870% +28.979%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   ```
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#discussion_r506512980



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -210,13 +260,60 @@ Example: average
 * Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns.
 * Finally, `get_value` returns an array with one entry computed from the state
 */
-struct GroupedHashAggregateIterator {
-    mode: AggregateMode,
-    schema: SchemaRef,
+async fn grouped_aggregate(
+    input: DynFutureRecordBatchIterator,
     group_expr: Vec<Arc<dyn PhysicalExpr>>,
     aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: SendableRecordBatchReader,
-    finished: bool,
+    schema: SchemaRef,
+    mode: AggregateMode,
+) -> ArrowResult<RecordBatch> {
+    // the expressions to evaluate the batch, one vec of expressions per aggregation
+    let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode)
+        .map_err(|e| ExecutionError::into_arrow_external_error(e))?;
+
+    // mapping key -> (set of accumulators, indices of the key in the batch)
+    // * the indexes are updated at each row
+    // * the accumulators are updated at the end of each batch
+    // * the indexes are `clear`ed at the end of each batch
+    let accumulators: FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)> =
+        FnvHashMap::default();
+
+    // this will be shared by multiple threads, which will update the accumulator as required.
+    // todo: a mutex over all groups is _brutal_: this requires more care
+    let accumulators = Arc::new(Mutex::new(accumulators));
+    // place under an arc to avoid cloning vectors
+    let aggregate_expressions = Arc::new(aggregate_expressions);
+
+    let futures = input.map(|future_batch| {
+        // send each aggregation to its own thread
+        let accumulators = accumulators.clone();
+        let aggr_expr = aggr_expr.clone();
+        let aggregate_expressions = aggregate_expressions.clone();
+        let group_expr = group_expr.clone();
+        tokio::spawn(async move {
+            let batch = future_batch.await?;
+            let mut accumulators = accumulators.lock().unwrap();
+            group_aggregate_batch(
+                &mode,
+                &group_expr,
+                &aggr_expr,
+                &batch,
+                &mut accumulators,
+                &aggregate_expressions,
+            )
+            .map_err(ExecutionError::into_arrow_external_error)
+        })
+    });
+
+    // parallel computation of the aggregation
+    try_join_all(futures)

Review comment:
       This pattern will have the downside of potentially requiring the entire input to the hash_aggregate to be buffered if the aggregate consumes data more slowly than it can be produced




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#issuecomment-710081684


   https://issues.apache.org/jira/browse/ARROW-10327


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#issuecomment-710094131


   If I'm understanding this correctly, this approach will mean that ordering is no longer deterministic and will cause problems for operations like `SortExec` which in turn will impact other future operators like `SortAggregateExec` and `SortMergeJoinExec`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#issuecomment-710104246


   @andygrove and @alamb , I agree with you: the other pattern is sufficient. Thanks a lot for the comments and for the teaching 💯 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8480:
URL: https://github.com/apache/arrow/pull/8480


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#issuecomment-710096202


   * (at least currently) all batches are placed in a single batch before sorting.
   
   * `join_all` [preserves the order](https://docs.rs/futures/0.3.6/futures/future/fn.join_all.html):
   
   > The returned future will drive execution for all of its underlying futures, collecting the results into a destination Vec<T> in the same order as they were provided.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#issuecomment-710203668


   Thank *you* for doing the work to drive this forward -- it is super awesome
   
   On Fri, Oct 16, 2020 at 11:19 AM Jorge Leitao <no...@github.com>
   wrote:
   
   > Closed #8480 <https://github.com/apache/arrow/pull/8480>.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/8480#event-3886924131>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADXZMKI7GQPWUZB6GX3K3DSLBP7JANCNFSM4STMP6CA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org