You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/16 14:56:33 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8480: ARROW-10327: [Rust] [DataFusion] Replace iterator of batches by iterator of future batch

alamb commented on a change in pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#discussion_r506512980



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -210,13 +260,60 @@ Example: average
 * Once all N record batches arrive, `merge` is performed, which builds a RecordBatch with N rows and 2 columns.
 * Finally, `get_value` returns an array with one entry computed from the state
 */
-struct GroupedHashAggregateIterator {
-    mode: AggregateMode,
-    schema: SchemaRef,
+async fn grouped_aggregate(
+    input: DynFutureRecordBatchIterator,
     group_expr: Vec<Arc<dyn PhysicalExpr>>,
     aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: SendableRecordBatchReader,
-    finished: bool,
+    schema: SchemaRef,
+    mode: AggregateMode,
+) -> ArrowResult<RecordBatch> {
+    // the expressions to evaluate the batch, one vec of expressions per aggregation
+    let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode)
+        .map_err(|e| ExecutionError::into_arrow_external_error(e))?;
+
+    // mapping key -> (set of accumulators, indices of the key in the batch)
+    // * the indexes are updated at each row
+    // * the accumulators are updated at the end of each batch
+    // * the indexes are `clear`ed at the end of each batch
+    let accumulators: FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)> =
+        FnvHashMap::default();
+
+    // this will be shared by multiple threads, which will update the accumulator as required.
+    // todo: a mutex over all groups is _brutal_: this requires more care
+    let accumulators = Arc::new(Mutex::new(accumulators));
+    // place under an arc to avoid cloning vectors
+    let aggregate_expressions = Arc::new(aggregate_expressions);
+
+    let futures = input.map(|future_batch| {
+        // send each aggregation to its own thread
+        let accumulators = accumulators.clone();
+        let aggr_expr = aggr_expr.clone();
+        let aggregate_expressions = aggregate_expressions.clone();
+        let group_expr = group_expr.clone();
+        tokio::spawn(async move {
+            let batch = future_batch.await?;
+            let mut accumulators = accumulators.lock().unwrap();
+            group_aggregate_batch(
+                &mode,
+                &group_expr,
+                &aggr_expr,
+                &batch,
+                &mut accumulators,
+                &aggregate_expressions,
+            )
+            .map_err(ExecutionError::into_arrow_external_error)
+        })
+    });
+
+    // parallel computation of the aggregation
+    try_join_all(futures)

Review comment:
       This pattern will have the downside of potentially requiring the entire input to the hash_aggregate to be buffered if the aggregate consumes data more slowly than it can be produced




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org