You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 16:06:41 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #520: Implement window functions with `order_by` clause

alamb commented on a change in pull request #520:
URL: https://github.com/apache/arrow-datafusion/pull/520#discussion_r651068287



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {

Review comment:
       Could this function could ever be passed a 0 row input? This check isn't a problem I am just wondering if my mental model is correct. 

##########
File path: integration-tests/sqls/simple_window_ordered_aggregation.sql
##########
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language gOVERning permissions and
+-- limitations under the License.
+
+SELECT

Review comment:
       Very nice 👍 

##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -1083,15 +1083,15 @@ fn dictionary_create_group_by_value<K: ArrowDictionaryKeyType>(
     let dict_col = col.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
 
     // look up the index in the values dictionary
-    let keys_col = dict_col.keys_array();
+    let keys_col = dict_col.keys();

Review comment:
       This change in API was not published with arrow 4.3 (it will be in arrow 5.0) but it is cool to leave the changes in this PR anyways 👍 

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -202,9 +249,55 @@ impl WindowExpr for AggregateWindowExpr {
         self.aggregate.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        let accumulator = self.aggregate.create_accumulator()?;
-        Ok(Box::new(AggregateWindowAccumulator { accumulator }))
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    /// evaluate the window function values against the batch
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        match self.evaluation_mode() {
+            WindowFrameUnits::Range => self.peer_based_evaluate(batch),
+            WindowFrameUnits::Rows => self.row_based_evaluate(batch),
+            WindowFrameUnits::Groups => self.group_based_evaluate(batch),
+        }
+    }
+}
+
+/// Aggregate window accumulator utilizes the accumulator from aggregation and do a accumulative sum
+/// across evaluation arguments based on peer equivalences.
+#[derive(Debug)]
+struct AggregateWindowAccumulator {
+    accumulator: Box<dyn Accumulator>,
+}
+
+impl AggregateWindowAccumulator {
+    /// scan one peer group of values (as arguments to window function) given by the value_range
+    /// and return evaluation result that are of the same number of rows.
+    fn scan_peers(
+        &mut self,
+        values: &[ArrayRef],
+        value_range: &Range<usize>,
+    ) -> Result<ArrayRef> {
+        if value_range.is_empty() {
+            return Err(DataFusionError::Internal(
+                "Value range cannot be empty".to_owned(),
+            ));
+        }
+        let len = value_range.end - value_range.start;
+        let values = values
+            .iter()
+            .map(|v| v.slice(value_range.start, len))

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -207,7 +208,10 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
     }
 }
 
-/// A window expression that is a built-in window function
+/// A window expression that is a built-in window function.
+///
+/// Note that unlike aggregation based window functions, built-in window functions normally ignore
+/// window frame spec, with th expression of first_value, last_value, and nth_value.

Review comment:
       ```suggestion
   /// window frame spec, with the exception of first_value, last_value, and nth_value.
   ```

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1299,6 +1309,49 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn window_order_by() -> Result<()> {
+        let results = execute(
+            "SELECT \
+            c1, \
+            c2, \
+            ROW_NUMBER() OVER (ORDER BY c1, c2), \
+            FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+            SUM(c2) OVER (ORDER BY c1, c2), \
+            COUNT(c2) OVER (ORDER BY c1, c2), \
+            MAX(c2) OVER (ORDER BY c1, c2), \
+            MIN(c2) OVER (ORDER BY c1, c2), \
+            AVG(c2) OVER (ORDER BY c1, c2) \
+            FROM test \
+            ORDER BY c1, c2 \
+            LIMIT 5",
+            4,
+        )
+        .await?;
+        // result in one batch, although e.g. having 2 batches do not change

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -113,54 +111,32 @@ impl BuiltInWindowFunctionExpr for NthValue {
         &self.name
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.kind,
-            self.data_type.clone(),
-        )?))
-    }
-}
-
-#[derive(Debug)]
-struct NthValueAccumulator {
-    kind: NthValueKind,
-    offset: u32,
-    value: ScalarValue,
-}
-
-impl NthValueAccumulator {
-    /// new count accumulator
-    pub fn try_new(kind: NthValueKind, data_type: DataType) -> Result<Self> {
-        Ok(Self {
-            kind,
-            offset: 0,
-            // null value of that data_type by default
-            value: ScalarValue::try_from(&data_type)?,
-        })
-    }
-}
-
-impl WindowAccumulator for NthValueAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.offset += 1;
-        match self.kind {
-            NthValueKind::Last => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::First if self.offset == 1 => {
-                self.value = values[0].clone();
-            }
-            NthValueKind::Nth(n) if self.offset == n => {
-                self.value = values[0].clone();
-            }
-            _ => {}
+    fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result<ArrayRef> {
+        if values.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No arguments supplied to {}",
+                self.name()
+            )));
         }
-
-        Ok(None)
-    }
-
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.value.clone()))
+        let value = &values[0];
+        if value.len() != num_rows {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid data supplied to {}, expect {} rows, got {} rows",
+                self.name(),
+                num_rows,
+                value.len()
+            )));
+        }
+        if num_rows == 0 {
+            return Ok(new_empty_array(value.data_type()));
+        }
+        let index: usize = match self.kind {

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -156,31 +162,72 @@ impl WindowExpr for BuiltInWindowExpr {
         self.window.expressions()
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
-        self.window.create_accumulator()
+    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_by
+    }
+
+    fn order_by(&self) -> &[PhysicalSortExpr] {
+        &self.order_by
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
+        // FIXME, for now we assume all the rows belong to the same partition, which will not be the
+        // case when partition_by is supported, in which case we'll parallelize the calls.
+        // See https://github.com/apache/arrow-datafusion/issues/299
+        let values = self.evaluate_args(batch)?;
+        self.window.evaluate(batch.num_rows(), &values)
     }
 }
 
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
     aggregate: Arc<dyn AggregateExpr>,
+    partition_by: Vec<Arc<dyn PhysicalExpr>>,
+    order_by: Vec<PhysicalSortExpr>,
+    window_frame: Option<WindowFrame>,
 }
 
-#[derive(Debug)]
-struct AggregateWindowAccumulator {
-    accumulator: Box<dyn Accumulator>,
-}
+impl AggregateWindowExpr {
+    /// the aggregate window function operates based on window frame, and by default the mode is
+    /// "range".
+    fn evaluation_mode(&self) -> WindowFrameUnits {
+        self.window_frame.unwrap_or_default().units
+    }
 
-impl WindowAccumulator for AggregateWindowAccumulator {
-    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
-        self.accumulator.update(values)?;
-        Ok(None)
+    /// create a new accumulator based on the underlying aggregation function
+    fn create_accumulator(&self) -> Result<AggregateWindowAccumulator> {
+        let accumulator = self.aggregate.create_accumulator()?;
+        Ok(AggregateWindowAccumulator { accumulator })
     }
 
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<Option<ScalarValue>> {
-        Ok(Some(self.accumulator.evaluate()?))
+    /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
+    /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
+    /// results for peers) and concatenate the results.
+    fn peer_based_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {

Review comment:
       I don't understand the naming of `peer` here (rather than `range_based_evaluate` for example, to match with `WindowFrameUnits::Range`)

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -172,68 +148,47 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
 
-    fn test_i32_result(expr: Arc<NthValue>, expected: i32) -> Result<()> {
+    fn test_i32_result(expr: NthValue, expected: Vec<i32>) -> Result<()> {
         let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8]));
+        let values = vec![arr];

Review comment:
       This test change shows the nice refactoring

##########
File path: datafusion/src/execution/context.rs
##########
@@ -1299,6 +1309,49 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn window_order_by() -> Result<()> {
+        let results = execute(
+            "SELECT \
+            c1, \
+            c2, \
+            ROW_NUMBER() OVER (ORDER BY c1, c2), \
+            FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+            NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+            SUM(c2) OVER (ORDER BY c1, c2), \
+            COUNT(c2) OVER (ORDER BY c1, c2), \
+            MAX(c2) OVER (ORDER BY c1, c2), \
+            MIN(c2) OVER (ORDER BY c1, c2), \
+            AVG(c2) OVER (ORDER BY c1, c2) \
+            FROM test \
+            ORDER BY c1, c2 \
+            LIMIT 5",
+            4,
+        )
+        .await?;
+        // result in one batch, although e.g. having 2 batches do not change
+        // result semantics, having a len=1 assertion upfront keeps surprises
+        // at bay
+        assert_eq!(results.len(), 1);
+
+        let expected = vec![
+            "+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
+            "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(c2) | LAST_VALUE(c2) | NTH_VALUE(c2,Int64(2)) | SUM(c2) | COUNT(c2) | MAX(c2) | MIN(c2) | AVG(c2) |",
+            "+----+----+--------------+-----------------+----------------+------------------------+---------+-----------+---------+---------+---------+",
+            "| 0  | 1  | 1            | 1               | 10             | 2                      | 1       | 1         | 1       | 1       | 1       |",

Review comment:
       I double checked these in postgres. 👍 

##########
File path: ballista/rust/core/Cargo.toml
##########
@@ -40,7 +40,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "e5cda312b697c3d610637b28c58b6f1b104b41cc" }

Review comment:
       4.3.0 has been released so we can probably revert this change

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -465,15 +467,65 @@ pub trait WindowExpr: Send + Sync + Debug {
         "WindowExpr: default name"
     }
 
-    /// the accumulator used to accumulate values from the expressions.
-    /// the accumulator expects the same number of arguments as `expressions` and must
-    /// return states with the same description as `state_fields`
-    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;
-
     /// expressions that are passed to the WindowAccumulator.

Review comment:
       The `WindowExpr` trait is looking 👌 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org