You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/17 04:13:40 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r506791355



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -505,62 +513,88 @@ fn aggregate_batch(
 
             // 1.3
             match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final => accum.merge_batch(values),
+                AggregateMode::Partial => {
+                    accum.update_batch(values)?;
+                }
+                AggregateMode::Final => {
+                    accum.merge_batch(values)?;
+                }
             }
+            Ok(accum)
         })
-        .collect::<Result<()>>()
+        .collect::<Result<Vec<_>>>()
 }
 
-impl Iterator for HashAggregateIterator {
+impl Stream for HashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mut accumulators = match create_accumulators(&self.aggr_expr) {
+        let accumulators = match create_accumulators(&self.aggr_expr) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
+        let expressions = Arc::new(expressions);
 
         let mode = self.mode;
         let schema = self.schema();
 
         // 1 for each batch, update / merge accumulators with the expressions' values
-        match self
+        // future is ready when all batches are computed
+        let future = self
             .input
             .as_mut()
-            .into_iter()
-            .map(|batch| {
-                aggregate_batch(&mode, &batch?, &mut accumulators, &expressions)
-                    .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            .try_fold(
+                // pass the expressions on every fold to handle closures' mutability
+                (accumulators, expressions),
+                |(acc, expr), batch| async move {
+                    aggregate_batch(&mode, &batch, acc, &expr)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                        .map(|agg| (agg, expr))
+                },
+            )
+            // pick the accumulators (disregard the expressions)
+            .map(|e| e.map(|e| e.0));
+
+        let future = future.map(|b| {
+            match b {
+                Err(e) => return Err(e),
+                Ok(acc) => {
+                    // 2 convert values to a record batch

Review comment:
       it is `2.`, referring to the item number two in the list of items above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org