You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/25 16:55:47 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #375: add window expression stream, delegated window aggregation to aggregate functions, and implement `row_number`

alamb commented on a change in pull request #375:
URL: https://github.com/apache/arrow-datafusion/pull/375#discussion_r638982807



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +540,43 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+/// A window accumulator represents a stateful object that lives throughout the evaluation of multiple
+/// rows and generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge`
+/// * compute the final value from its internal state via `evaluate`
+pub trait WindowAccumulator: Send + Sync + Debug {
+    /// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
+    /// optionally generates values.
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;

Review comment:
       I see -- looking at how these functions are used for `nth_value` [here](https://github.com/apache/arrow-datafusion/pull/413/files#diff-d8b818bb6ef18a0609ad98574fed66ad4b9ea79338e89d184db7dd5b0218f19aR205) helped me get a better sense for how these traits work
   
   I think as you go through the implementation, adding some additional details to help future implementers of this trait. For example, when it is ok to return values from `scan()` or `scan_batch()` and what the expected total number of rows produced?
   
   However having several examples of of implemented window functions I think will help too so no need to change anything more at this time.

##########
File path: datafusion/tests/sql.rs
##########
@@ -797,20 +797,30 @@ async fn csv_query_count() -> Result<()> {
     Ok(())
 }
 
-// FIXME uncomment this when exec is done
-// #[tokio::test]
-// async fn csv_query_window_with_empty_over() -> Result<()> {
-//     let mut ctx = ExecutionContext::new();
-//     register_aggregate_csv(&mut ctx)?;
-//     let sql = "SELECT count(c12) over () FROM aggregate_test_100";
-//     // FIXME: so far the WindowAggExec is not implemented
-//     // and the current behavior is to throw not implemented exception
-
-//     let result = execute(&mut ctx, sql).await;
-//     let expected: Vec<Vec<String>> = vec![];
-//     assert_eq!(result, expected);
-//     Ok(())
-// }
+#[tokio::test]
+async fn csv_query_window_with_empty_over() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv(&mut ctx)?;
+    let sql = "select \
+    c2, \
+    row_number() over (),
+    sum(c3) over (), \
+    avg(c3) over (), \
+    count(c3) over (), \
+    max(c3) over (), \
+    min(c3) over () \
+    from aggregate_test_100 limit 5";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -186,10 +271,259 @@ impl ExecutionPlan for WindowAggExec {
             ));
         }
 
-        // let input = self.input.execute(0).await?;
+        let input = self.input.execute(partition).await?;
+
+        let stream = Box::pin(WindowAggStream::new(
+            self.schema.clone(),
+            self.window_expr.clone(),
+            input,
+        ));
+        Ok(stream)
+    }
+}
+
+pin_project! {
+    /// stream for window aggregation plan
+    pub struct WindowAggStream {
+        schema: SchemaRef,
+        #[pin]
+        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        finished: bool,
+    }
+}
+
+type WindowAccumulatorItem = Box<dyn WindowAccumulator>;
+
+fn window_expressions(
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
+    Ok(window_expr
+        .iter()
+        .map(|expr| expr.expressions())
+        .collect::<Vec<_>>())
+}
+
+fn window_aggregate_batch(
+    batch: &RecordBatch,
+    window_accumulators: &mut [WindowAccumulatorItem],
+    expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<Vec<Option<ArrayRef>>> {
+    // 1.1 iterate accumulators and respective expressions together
+    // 1.2 evaluate expressions
+    // 1.3 update / merge window accumulators with the expressions' values
+
+    // 1.1
+    window_accumulators
+        .iter_mut()
+        .zip(expressions)
+        .map(|(window_acc, expr)| {
+            // 1.2
+            let values = &expr
+                .iter()
+                .map(|e| e.evaluate(batch))
+                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+                .collect::<Result<Vec<_>>>()?;
+
+            window_acc.scan_batch(batch.num_rows(), values)
+        })
+        .into_iter()
+        .collect::<Result<Vec<_>>>()
+}
+
+/// returns a vector of ArrayRefs, where each entry corresponds to either the
+/// final value (mode = Final) or states (mode = Partial)

Review comment:
       this comment looks a little out of date




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org