You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/25 17:29:16 UTC

[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #413: finish implementation of window accumulation and func `row_number()` and add `sql.rs` test

Dandandan commented on a change in pull request #413:
URL: https://github.com/apache/arrow-datafusion/pull/413#discussion_r639013631



##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -186,10 +301,252 @@ impl ExecutionPlan for WindowAggExec {
             ));
         }
 
-        // let input = self.input.execute(0).await?;
+        let input = self.input.execute(partition).await?;
+
+        let stream = Box::pin(WindowAggStream::new(
+            self.schema.clone(),
+            self.window_expr.clone(),
+            input,
+        ));
+        Ok(stream)
+    }
+}
+
+pin_project! {
+    /// stream for window aggregation plan
+    pub struct WindowAggStream {
+        schema: SchemaRef,
+        #[pin]
+        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        finished: bool,
+    }
+}
+
+type WindowAccumulatorItem = Box<dyn WindowAccumulator>;
+
+fn window_expressions(
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
+    Ok(window_expr
+        .iter()
+        .map(|expr| expr.expressions())
+        .collect::<Vec<_>>())
+}
+
+fn window_aggregate_batch(
+    batch: &RecordBatch,
+    window_accumulators: &mut [WindowAccumulatorItem],
+    expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<Vec<Option<Vec<ScalarValue>>>> {
+    // 1.1 iterate accumulators and respective expressions together
+    // 1.2 evaluate expressions
+    // 1.3 update / merge window accumulators with the expressions' values
+
+    // 1.1
+    window_accumulators
+        .iter_mut()
+        .zip(expressions)
+        .map(|(window_acc, expr)| {
+            // 1.2
+            let values = &expr
+                .iter()
+                .map(|e| e.evaluate(batch))
+                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+                .collect::<Result<Vec<_>>>()?;
+
+            window_acc.scan_batch(batch.num_rows(), values)
+        })
+        .into_iter()
+        .collect::<Result<Vec<_>>>()
+}
+
+/// returns a vector of ArrayRefs, where each entry corresponds to either the
+/// final value (mode = Final) or states (mode = Partial)
+fn finalize_window_aggregation(
+    window_accumulators: &[WindowAccumulatorItem],
+) -> Result<Vec<Option<ScalarValue>>> {
+    window_accumulators
+        .iter()
+        .map(|window_accumulator| window_accumulator.evaluate())
+        .collect::<Result<Vec<_>>>()
+}
+
+fn create_window_accumulators(
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<WindowAccumulatorItem>> {
+    window_expr
+        .iter()
+        .map(|expr| expr.create_accumulator())
+        .collect::<Result<Vec<_>>>()
+}
+
+async fn compute_window_aggregate(
+    schema: SchemaRef,
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    mut input: SendableRecordBatchStream,
+) -> ArrowResult<RecordBatch> {
+    let mut window_accumulators = create_window_accumulators(&window_expr)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let expressions = window_expressions(&window_expr)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let expressions = Arc::new(expressions);
+
+    // TODO each element shall have some size hint
+    let mut accumulator: Vec<Vec<ScalarValue>> =
+        iter::repeat(vec![]).take(window_expr.len()).collect();
+
+    let mut original_batches: Vec<RecordBatch> = vec![];
+
+    let mut total_num_rows = 0;
+
+    while let Some(batch) = input.next().await {
+        let batch = batch?;
+        total_num_rows += batch.num_rows();
+        original_batches.push(batch.clone());
+
+        let batch_aggregated =
+            window_aggregate_batch(&batch, &mut window_accumulators, &expressions)
+                .map_err(DataFusionError::into_arrow_external_error)?;
+        accumulator.iter_mut().zip(batch_aggregated).for_each(
+            |(acc_for_window, window_batch)| {
+                if let Some(data) = window_batch {
+                    acc_for_window.extend(data);
+                }
+            },
+        );
+    }
+
+    let aggregated_mapped = finalize_window_aggregation(&window_accumulators)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let mut columns: Vec<ArrayRef> = accumulator
+        .iter()
+        .zip(aggregated_mapped)
+        .map(|(acc, agg)| match (acc, agg) {
+            // either accumulator values or the aggregated values are non-empty, but not both
+            (acc, Some(scalar_value)) if acc.is_empty() => {
+                Ok(scalar_value.to_array_of_size(total_num_rows))
+            }
+            (acc, None) if !acc.is_empty() => ScalarValue::iter_to_array(acc),
+            _ => Err(DataFusionError::Execution(
+                "Invalid window function behavior".to_owned(),
+            )),
+        })
+        .collect::<Result<Vec<ArrayRef>>>()
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    for i in 0..(schema.fields().len() - window_expr.len()) {
+        let col = concat(

Review comment:
       I guess this is only needed for the empty `over` clause right, as in other cases we probably only want to access the rows.
   Also for functions like RowNumber we could be streaming/emit batches instead of "caching" them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org