You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/27 18:01:54 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

alamb commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r640837938



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically

Review comment:
       I wonder if a more idomatic Rust way of doing this would be an enum like
   
   ```rust
   enum NthValue {
     First,
     Last,
     Nth(u32)
   }
   
   ```
   
   And then in `NthValueAccumulator::scan` you would have something like
   
   ```
   match self.n {
     NthValue::First|NthValue::Nth(1)  => {...}
     Nth::Last => {..}
   }
   ```
   
   There is nothing wrong with the special value approach either -- I just figured I would point it out

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression

Review comment:
       Given how similar `FirstValue`, `LastValue` and `NthValue` are, would it possible to just use the `NthValue` struct rather than have three mostly repeated structs?

##########
File path: datafusion/tests/sql.rs
##########
@@ -807,17 +807,20 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
     avg(c3) over (), \
     count(c3) over (), \
     max(c3) over (), \
-    min(c3) over () \
+    min(c3) over (), \
+    first_value(c3) over (), \
+    last_value(c3) over (), \
+    nth_value(c3, 2) over ()
     from aggregate_test_100 \
-    order by c2 \
+    order by c2
     limit 5";
     let actual = execute(&mut ctx, sql).await;
     let expected = vec![
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
+        vec!["1", "781", "7.81", "100", "125", "-117", "1", "30", "-40"],

Review comment:
       I am not sure about these results. "first_value", "last_value" and "nth_value" over an unsorted window (as this query is is using) seems undefined to me. 
   
   For example, Using `datafusion-cli`:
   
   ```
       CREATE EXTERNAL TABLE aggregate_test_100 (
           c1  VARCHAR NOT NULL,
           c2  INT NOT NULL,
           c3  SMALLINT NOT NULL,
           c4  SMALLINT NOT NULL,
           c5  INT NOT NULL,
           c6  BIGINT NOT NULL,
           c7  SMALLINT NOT NULL,
           c8  INT NOT NULL,
           c9  BIGINT NOT NULL,
           c10 VARCHAR NOT NULL,
           c11 FLOAT NOT NULL,
           c12 DOUBLE NOT NULL,
           c13 VARCHAR NOT NULL
       )
       STORED AS CSV
       WITH HEADER ROW
       LOCATION '/Users/alamb/Software/arrow-datafusion/testing/data/csv/';
   ```
   You can see there are many values for `c3`, and the values of `first_value`, `last_value` and `nth_value` seem to be picking some arbitrary rows: 
   
   ```
   > select c2, c3 from aggregate_test_100 order by c2;
   +----+------+
   | c2 | c3   |
   +----+------+
   | 1  | 29   |
   | 1  | -85  |
   | 1  | 38   |
   | 1  | 57   |
   | 1  | 54   |
   | 1  | 103  |
   | 1  | -98  |
   | 1  | -99  |
   | 1  | -25  |
   | 1  | 36   |
   | 1  | 41   |
   | 1  | -8   |
   | 1  | -24  |
   | 1  | 125  |
   | 1  | 70   |
   | 1  | -72  |
   | 1  | 71   |
   | 1  | -56  |
   | 1  | -5   |
   | 1  | 12   |
   | 1  | 83   |
   | 1  | 120  |
   | 2  | 1    |
   | 2  | 113  |
   | 2  | 49   |
   | 2  | 97   |
   | 2  | -29  |
   | 2  | 45   |
   | 2  | -60  |
   | 2  | 93   |
   | 2  | 63   |
   | 2  | 52   |
   | 2  | 31   |
   | 2  | -106 |
   | 2  | -60  |
   | 2  | 68   |
   | 2  | -61  |
   | 2  | 122  |
   | 2  | -48  |
   | 2  | 52   |
   | 2  | -117 |
   | 2  | 29   |
   | 2  | -107 |
   | 2  | -43  |
   | 3  | 104  |
   | 3  | 13   |
   | 3  | 112  |
   | 3  | 77   |
   | 3  | 17   |
   | 3  | 13   |
   | 3  | 73   |
   | 3  | -2   |
   | 3  | 22   |
   | 3  | 17   |
   | 3  | -76  |
   | 3  | 71   |
   | 3  | 14   |
   | 3  | -12  |
   | 3  | -72  |
   | 3  | 97   |
   | 3  | -101 |
   | 3  | -95  |
   | 3  | 123  |
   | 4  | -111 |
   | 4  | -38  |
   | 4  | -54  |
   | 4  | -56  |
   | 4  | -53  |
   | 4  | 123  |
   | 4  | 97   |
   | 4  | 102  |
   | 4  | 65   |
   | 4  | 17   |
   | 4  | 55   |
   | 4  | 73   |
   | 4  | -117 |
   | 4  | -101 |
   | 4  | -79  |
   | 4  | 74   |
   | 4  | 96   |
   | 4  | -90  |
   | 4  | -59  |
   | 4  | 3    |
   | 4  | 5    |
   | 4  | 47   |
   | 4  | 30   |
   | 5  | -40  |
   | 5  | -82  |
   | 5  | 36   |
   | 5  | -31  |
   | 5  | -5   |
   | 5  | 68   |
   | 5  | -59  |
   | 5  | 62   |
   | 5  | -94  |
   | 5  | 64   |
   | 5  | -86  |
   | 5  | 118  |
   | 5  | -101 |
   | 5  | -44  |
   +----+------+
   ```
   
   It is not clear that `1`, `30` and `-40` are the "right" answers (there is no good answer for this dataset)

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -484,45 +515,106 @@ impl RecordBatchStream for WindowAggStream {
 
 #[cfg(test)]
 mod tests {
-    // use super::*;
-
-    // /// some mock data to test windows
-    // fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
-    //     // define a schema.
-    //     let schema = Arc::new(Schema::new(vec![
-    //         Field::new("a", DataType::UInt32, false),
-    //         Field::new("b", DataType::Float64, false),
-    //     ]));
-
-    //     // define data.
-    //     (
-    //         schema.clone(),
-    //         vec![
-    //             RecordBatch::try_new(
-    //                 schema.clone(),
-    //                 vec![
-    //                     Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
-    //                     Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
-    //                 ],
-    //             )
-    //             .unwrap(),
-    //             RecordBatch::try_new(
-    //                 schema,
-    //                 vec![
-    //                     Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
-    //                     Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
-    //                 ],
-    //             )
-    //             .unwrap(),
-    //         ],
-    //     )
-    // }
-
-    // #[tokio::test]
-    // async fn window_function() -> Result<()> {
-    //     let input: Arc<dyn ExecutionPlan> = unimplemented!();
-    //     let input_schema = input.schema();
-    //     let window_expr = vec![];
-    //     WindowAggExec::try_new(window_expr, input, input_schema);
-    // }
+    use super::*;
+    use crate::physical_plan::aggregates::AggregateFunction;
+    use crate::physical_plan::collect;
+    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
+    use crate::physical_plan::expressions::col;
+    use crate::test;
+    use arrow::array::*;
+
+    fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, SchemaRef)> {
+        let schema = test::aggr_test_schema();
+        let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
+        let csv = CsvExec::try_new(
+            &path,
+            CsvReadOptions::new().schema(&schema),
+            None,
+            1024,
+            None,
+        )?;
+
+        let input = Arc::new(csv);
+        Ok((input, schema))
+    }
+
+    #[tokio::test]
+    async fn window_function_input_partition() -> Result<()> {
+        let (input, schema) = create_test_schema(4)?;
+
+        let window_exec = Arc::new(WindowAggExec::try_new(
+            vec![create_window_expr(
+                &WindowFunction::AggregateFunction(AggregateFunction::Count),
+                &[col("c3")],
+                schema.as_ref(),
+                "count".to_owned(),
+            )?],
+            input,
+            schema.clone(),
+        )?);
+
+        let result = collect(window_exec).await;
+
+        assert!(result.is_err());
+        if let Some(DataFusionError::Internal(msg)) = result.err() {
+            assert_eq!(
+                msg,
+                "WindowAggExec requires a single input partition".to_owned()
+            );
+        } else {
+            unreachable!("Expect an internal error to happen");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn window_function() -> Result<()> {
+        let (input, schema) = create_test_schema(1)?;

Review comment:
       I think it would be valuable to create a test with more than one partition (so that the data may not arrive to the `WindowFunction` as a single `RecordBatch`)

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
+        if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            // for last_value function
+            self.value = values[0].clone();

Review comment:
       Should this be `values.last()` rather than the first (`0`th) value?

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Are we guaranteed that `scan()` will see the entire window in a single call? Or would it be possible to see `scan()` called for two different slices?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org