You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/24 18:43:59 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #375: add window expression stream, delegated window aggregation, and a basic structure for row_number

alamb commented on a change in pull request #375:
URL: https://github.com/apache/arrow-datafusion/pull/375#discussion_r638165674



##########
File path: datafusion/src/physical_plan/expressions/row_number.rs
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::Result;
+use crate::physical_plan::{BuiltInWindowFunctionExpr, PhysicalExpr};
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::sync::Arc;
+
+/// row_number expression
+#[derive(Debug)]
+pub struct RowNumber {
+    name: String,
+}
+
+impl RowNumber {
+    /// Create a new MAX aggregate function

Review comment:
       ```suggestion
       /// Create a new ROW_NUMBER function
   ```

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -457,10 +457,41 @@ pub trait WindowExpr: Send + Sync + Debug {
     fn name(&self) -> &str {
         "WindowExpr: default name"
     }
+
+    /// the accumulator used to accumulate values from the expressions.
+    /// the accumulator expects the same number of arguments as `expressions` and must
+    /// return states with the same description as `state_fields`
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
+
+    /// expressions that are passed to the WindowAccumulator.
+    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.

Review comment:
       ```suggestion
       /// Functions which take a single input argument, such as `sum`, return a single [`Expr`], 
       /// others (e.g. `cov`) return many.
   ```

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +540,43 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+/// A window accumulator represents a stateful object that lives throughout the evaluation of multiple
+/// rows and generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge`
+/// * compute the final value from its internal state via `evaluate`
+pub trait WindowAccumulator: Send + Sync + Debug {
+    /// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
+    /// optionally generates values.
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;
+
+    /// scans the accumulator's state from a vector of arrays.
+    fn scan_batch(&mut self, values: &[ArrayRef]) -> Result<Option<Vec<ScalarValue>>> {

Review comment:
       Following up on @Dandandan 's comment https://github.com/apache/arrow-datafusion/pull/380#discussion_r638136080 since Arrow is column based already, creating `Vec`s of `ScalarValues` seems like it may be limiting performance
   
   What would you think about using  [slice](https://docs.rs/arrow/4.1.0/arrow/array/trait.Array.html#method.slice) instead to pick subsets of arrays (rather than `Vec<ScalarValue>`)?
   
   I realize this code is following the model of `Accumulator` so I think it would also be ok to continue in this style and we can go back and rewrite in terms of `Array` later if needed
   

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -457,10 +457,41 @@ pub trait WindowExpr: Send + Sync + Debug {
     fn name(&self) -> &str {
         "WindowExpr: default name"
     }
+
+    /// the accumulator used to accumulate values from the expressions.
+    /// the accumulator expects the same number of arguments as `expressions` and must
+    /// return states with the same description as `state_fields`
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
+
+    /// expressions that are passed to the WindowAccumulator.
+    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+}
+
+/// A window expression that is a built-in window function
+pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {

Review comment:
       
   `BuiltInWindowFunctionExpr` appears to be a subset of `WindowExpr` -- what is the purpose of having both?  I understand `WindowExpr` but I don't understand `BuiltInWindowFunctionExpr`

##########
File path: datafusion/src/physical_plan/expressions/row_number.rs
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution

Review comment:
       ```suggestion
   //! Defines physical expression for `row_number` that can evaluated at runtime during query execution
   ```

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +540,43 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+/// A window accumulator represents a stateful object that lives throughout the evaluation of multiple
+/// rows and generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge`
+/// * compute the final value from its internal state via `evaluate`
+pub trait WindowAccumulator: Send + Sync + Debug {
+    /// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
+    /// optionally generates values.
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;

Review comment:
       It would help me if you could explain where the "window" for the window function appears in this trait. I assume you already have a design in mind, so I figured I would ask here
   
   I am thinking about a query like the following
   
   ```
   select sum(value) OVER (ROWS 5 PRECEDING) FROM ....
   ```
   
   I think in this case, you end up with 10 aggregate values from 10 different windows, in the the following manner:
   
   ```
                                                        
                     1 2 3 4 5 6 7 8 9       input      
                                                        
       window 1      ─                                  
       window 2      ───                                
       window 3      ─────                              
       window 4      ───────                            
       window 5      ─────────                          
       window 6        ─────────                        
       window 7          ─────────                      
       window 8            ─────────                    
       window 9              ─────────                  
   ```
   
   I would have expected the `WindowAccumulator` to have functions like 
   
   ```rust
   /// Add a new row to the current window
   fn new_row_in_window(ScalarValue);
   
   /// remove a row from the current window
   fn remove_row_from_window(ScalarValue);
   
   /// The current value of this function for the given window
   fn current_value(ScalarValue);
   ```
   
   Or possibly something like 
   
   ```rust
   evaluate(window: &[ArrayRef]) -> ScalarValue
   ```
   

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -186,10 +273,260 @@ impl ExecutionPlan for WindowAggExec {
             ));
         }
 
-        // let input = self.input.execute(0).await?;
+        let input = self.input.execute(partition).await?;
 
-        Err(DataFusionError::NotImplemented(
-            "WindowAggExec::execute".to_owned(),
-        ))
+        let stream = Box::pin(WindowAggStream::new(
+            self.schema.clone(),
+            self.window_expr.clone(),
+            input,
+        ));
+        Ok(stream)
+    }
+}
+
+pin_project! {
+    /// stream for window aggregation plan
+    pub struct WindowAggStream {
+        schema: SchemaRef,
+        #[pin]
+        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        finished: bool,
+    }
+}
+
+type WindowAccumulatorItem = Box<dyn WindowAccumulator>;
+
+fn window_expressions(
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
+    Ok(window_expr
+        .iter()
+        .map(|expr| expr.expressions())
+        .collect::<Vec<_>>())
+}
+
+fn window_aggregate_batch(
+    batch: &RecordBatch,
+    window_accumulators: &mut [WindowAccumulatorItem],
+    expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<Vec<Option<Vec<ScalarValue>>>> {
+    // 1.1 iterate accumulators and respective expressions together
+    // 1.2 evaluate expressions
+    // 1.3 update / merge window accumulators with the expressions' values
+
+    // 1.1
+    window_accumulators
+        .iter_mut()
+        .zip(expressions)
+        .map(|(window_acc, expr)| {
+            // 1.2
+            let values = &expr
+                .iter()
+                .map(|e| e.evaluate(batch))
+                .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+                .collect::<Result<Vec<_>>>()?;
+
+            window_acc.scan_batch(values)
+        })
+        .into_iter()
+        .collect::<Result<Vec<_>>>()
+}
+
+/// returns a vector of ArrayRefs, where each entry corresponds to either the
+/// final value (mode = Final) or states (mode = Partial)
+fn finalize_window_aggregation(
+    window_accumulators: &[WindowAccumulatorItem],
+) -> Result<Vec<Option<ScalarValue>>> {
+    window_accumulators
+        .iter()
+        .map(|window_accumulator| window_accumulator.evaluate())
+        .collect::<Result<Vec<_>>>()
+}
+
+fn create_window_accumulators(
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Vec<WindowAccumulatorItem>> {
+    window_expr
+        .iter()
+        .map(|expr| expr.create_accumulator())
+        .collect::<Result<Vec<_>>>()
+}
+
+async fn compute_window_aggregate(
+    schema: SchemaRef,
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    mut input: SendableRecordBatchStream,
+) -> ArrowResult<RecordBatch> {
+    let mut window_accumulators = create_window_accumulators(&window_expr)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let expressions = window_expressions(&window_expr)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let expressions = Arc::new(expressions);
+
+    // TODO each element shall have some size hint
+    let mut accumulator: Vec<Vec<ScalarValue>> =
+        iter::repeat(vec![]).take(window_expr.len()).collect();
+
+    let mut original_batches: Vec<RecordBatch> = vec![];
+
+    let mut total_num_rows = 0;
+
+    while let Some(batch) = input.next().await {
+        let batch = batch?;
+        total_num_rows += batch.num_rows();
+        original_batches.push(batch.clone());
+
+        let batch_aggregated =
+            window_aggregate_batch(&batch, &mut window_accumulators, &expressions)
+                .map_err(DataFusionError::into_arrow_external_error)?;
+        accumulator.iter_mut().zip(batch_aggregated).for_each(
+            |(acc_for_window, window_batch)| {
+                if let Some(data) = window_batch {
+                    acc_for_window.extend(data);
+                }
+            },
+        );
+    }
+
+    let aggregated_mapped = finalize_window_aggregation(&window_accumulators)
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    let mut columns: Vec<ArrayRef> = accumulator
+        .iter()
+        .zip(aggregated_mapped)
+        .map(|(acc, agg)| {
+            let arr: ArrayRef = match (acc, agg) {
+                (acc, Some(scalar_value)) if acc.is_empty() => {
+                    scalar_value.to_array_of_size(total_num_rows)
+                }
+                (acc, None) if !acc.is_empty() => {
+                    return Err(DataFusionError::NotImplemented(
+                        "built in window function not yet implemented".to_owned(),
+                    ))
+                }
+                _ => {
+                    return Err(DataFusionError::Execution(
+                        "invalid window function behavior".to_owned(),
+                    ))
+                }
+            };
+            Ok(arr)
+        })
+        .collect::<Result<Vec<ArrayRef>>>()
+        .map_err(DataFusionError::into_arrow_external_error)?;
+
+    for i in 0..(schema.fields().len() - window_expr.len()) {
+        let col = concat(
+            &original_batches
+                .iter()
+                .map(|batch| batch.column(i).as_ref())
+                .collect::<Vec<_>>(),
+        )?;
+        columns.push(col);
     }
+
+    RecordBatch::try_new(schema.clone(), columns)
+}
+
+impl WindowAggStream {
+    /// Create a new WindowAggStream
+    pub fn new(
+        schema: SchemaRef,
+        window_expr: Vec<Arc<dyn WindowExpr>>,
+        input: SendableRecordBatchStream,
+    ) -> Self {
+        let (tx, rx) = futures::channel::oneshot::channel();
+        let schema_clone = schema.clone();
+        tokio::spawn(async move {
+            let result = compute_window_aggregate(schema_clone, window_expr, input).await;
+            tx.send(result)
+        });
+
+        Self {
+            output: rx,
+            finished: false,
+            schema,
+        }
+    }
+}
+
+impl Stream for WindowAggStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if self.finished {
+            return Poll::Ready(None);
+        }
+
+        // is the output ready?
+        let this = self.project();
+        let output_poll = this.output.poll(cx);
+
+        match output_poll {
+            Poll::Ready(result) => {
+                *this.finished = true;
+                // check for error in receiving channel and unwrap actual result
+                let result = match result {
+                    Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
+                    Ok(result) => Some(result),
+                };
+                Poll::Ready(result)
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl RecordBatchStream for WindowAggStream {
+    /// Get the schema
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    // use super::*;

Review comment:
       I think tests would be good to have




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org