You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/24 01:06:11 UTC

[GitHub] [arrow-datafusion] Jimexist opened a new pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Jimexist opened a new pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403


   # Which issue does this PR close?
   
   add first_value, last_value, and nth_value
   
   Partly close-s #298 
   
    # Rationale for this change
   
   adding three window functions
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850359193


   @Jimexist there is some change in `parquet-testing` can you revert that change?
   Otherwise it's looking good (added some comments), I agree with @alamb we can merge it as is and improving the implementation in following PRs.
   
   Thanks for all the great work so far!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850334886


   > I guess it's not arbitrary but rather just take the ordering as is. When #425 is merged I'll add one test case to compare with psql so the behavior is consistent.
   
   Sounds good -- makes sense. As @Dandandan says what this means to me is that there is no well defined "correct result" for the query (and thus the test is ensuring the code doesn't crash, but doesn't really ensure it is getting the correct values)
   
   > Of course when ordering clause is implemented then the behavior can also be tested in the same way, along with some unit test. 
   
   πŸ‘ 
   
   > I plan to implement ordering after this and the lead/lag PR are merged because it requires some structural changes to the planner.
   
   I think that makes sense. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850049189


   > Thank you @Jimexist  -- the  code structure looks clean to me . Nice work
   > 
   > 
   > 
   > However, I am concerned about the correctness of these results. As I understand it, `first_value`, `last_value` and `nth_value` are not well defined unless there is an ordering on the window (aka without an ordering you basically can get some arbitrary value from the window). 
   > 
   > 
   > 
   > I wonder if it would make sense to implement ordering for windows first, so we can write tests will well defined output
   > 
   > 
   > 
   > I also see some change to the `parquet-testing` module which I wonder if that was intended
   
   I guess it's not arbitrary but rather just take the ordering as is. When https://github.com/apache/arrow-datafusion/pull/425 is merged I'll add one test case to compare with psql so the behavior is consistent.
   
   Of course when ordering clause is implemented then the behavior can also be tested in the same way, along with some unit test.
   
   I plan to implement ordering after this and the lead/lag PR are merged because it requires some structural changes to the planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641481820



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Future performance improvement idea: rather than converting each row to `ScalarValue` and passing `values: &[ScalarValue]`  we should strive to *slice* into the original arrays (i.e. don't do a lot of work / don't copy data) and produce arrays based on offsets and then use `take` to build new arrays out of all of the indices (in this case 1, but in case of smaller windows / partitions etc. this could grow by a lot).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-849091594


   @Jimexist  I plan to review this PR later today or tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641481820



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Future performance improvement idea: rather than converting each row to `ScalarValue` and passing `values: &[ScalarValue]`  we should strive to slice into the original arrays.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850367144


   > @Jimexist there is some change in `parquet-testing` can you revert that change?
   > 
   > Otherwise it's looking good (added some comments), I agree with @alamb we can merge it as is and improving the implementation in following PRs.
   > 
   > 
   > 
   > Thanks for all the great work so far!!
   
   Thanks I'll make sure to figure out a way to consistently test these and also try to stay on par with psql.
   
   I'm AFK until Monday so we can either way until then or feel free to help me revert that submodule change and merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641480304



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically

Review comment:
       Yeah I agree that sounds like a more idiomatic way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] nevi-me commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r637673685



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution

Review comment:
       ```suggestion
   //! Defines physical expressions that can be evaluated at runtime during query execution
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-846671204


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#403](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (59663eb) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/174226c086a4838eab2a238853b4871c295c0189?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (174226c) will **increase** coverage by `0.48%`.
   > The diff coverage is `73.23%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/403/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #403      +/-   ##
   ==========================================
   + Coverage   74.94%   75.42%   +0.48%     
   ==========================================
     Files         146      148       +2     
     Lines       24314    24595     +281     
   ==========================================
   + Hits        18223    18552     +329     
   + Misses       6091     6043      -48     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [datafusion/src/physical\_plan/expressions/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9tb2QucnM=) | `71.42% <ΓΈ> (ΓΈ)` | |
   | [...fusion/src/physical\_plan/expressions/row\_number.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9yb3dfbnVtYmVyLnJz) | `0.00% <0.00%> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/hash\_aggregate.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9oYXNoX2FnZ3JlZ2F0ZS5ycw==) | `85.21% <ΓΈ> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/sort.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9zb3J0LnJz) | `92.07% <ΓΈ> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/window\_functions.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi93aW5kb3dfZnVuY3Rpb25zLnJz) | `86.95% <64.70%> (-1.77%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `80.58% <68.75%> (-2.18%)` | :arrow_down: |
   | [...afusion/src/physical\_plan/expressions/nth\_value.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9udGhfdmFsdWUucnM=) | `70.76% <70.76%> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/windows.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi93aW5kb3dzLnJz) | `82.12% <88.09%> (+82.12%)` | :arrow_up: |
   | [datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZXhlY3V0aW9uL2NvbnRleHQucnM=) | `92.07% <100.00%> (+0.02%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wbGFubmVyLnJz) | `80.45% <100.00%> (+3.94%)` | :arrow_up: |
   | ... and [15 more](https://codecov.io/gh/apache/arrow-datafusion/pull/403/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [174226c...59663eb](https://codecov.io/gh/apache/arrow-datafusion/pull/403?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641481820



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Future performance improvement idea: rather than converting each row to `ScalarValue` and passing `values: &[ScalarValue]`  we should strive to *slice* into the original arrays (i.e. don't do a lot of work / don't copy data) and produce arrays based on offsets and then use `take` to build new arrays.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641481820



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Future performance improvement idea: rather than converting each row to `ScalarValue` and passing `values: &[ScalarValue]`  we should strive to *slice* into the original arrays (i.e. don't do a lot of work)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850050234


   > > Thank you @Jimexist  -- the  code structure looks clean to me . Nice work
   > > 
   > > 
   > > 
   > > However, I am concerned about the correctness of these results. As I understand it, `first_value`, `last_value` and `nth_value` are not well defined unless there is an ordering on the window (aka without an ordering you basically can get some arbitrary value from the window). 
   > > 
   > > 
   > > 
   > > I wonder if it would make sense to implement ordering for windows first, so we can write tests will well defined output
   > > 
   > > 
   > > 
   > > I also see some change to the `parquet-testing` module which I wonder if that was intended
   > 
   > I guess it's not arbitrary but rather just take the ordering as is. When https://github.com/apache/arrow-datafusion/pull/425 is merged I'll add one test case to compare with psql so the behavior is consistent.
   > 
   > Of course when ordering clause is implemented then the behavior can also be tested in the same way, along with some unit test.
   > 
   > I plan to implement ordering after this and the lead/lag PR are merged because it requires some structural changes to the planner.
   
   See also https://github.com/apache/arrow-datafusion/pull/429
   
   Both can be independently merged first because with the sort clause implemented these two logic shall stay unchanged as sorting happens as a separate physical plan that precedes these and feeds immediately to these.
   
   Regarding the submodule change it's not intended - will revert.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r642104014



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically

Review comment:
       to address later: https://github.com/apache/arrow-datafusion/issues/448




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-851031913


   > @Jimexist there is some change in `parquet-testing` can you revert that change?
   > Otherwise it's looking good (added some comments), I agree with @alamb we can merge it as is and improving the implementation in following PRs.
   > 
   > Thanks for all the great work so far!!
   
   @Dandandan this is reverted


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r640837938



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically

Review comment:
       I wonder if a more idomatic Rust way of doing this would be an enum like
   
   ```rust
   enum NthValue {
     First,
     Last,
     Nth(u32)
   }
   
   ```
   
   And then in `NthValueAccumulator::scan` you would have something like
   
   ```
   match self.n {
     NthValue::First|NthValue::Nth(1)  => {...}
     Nth::Last => {..}
   }
   ```
   
   There is nothing wrong with the special value approach either -- I just figured I would point it out

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression

Review comment:
       Given how similar `FirstValue`, `LastValue` and `NthValue` are, would it possible to just use the `NthValue` struct rather than have three mostly repeated structs?

##########
File path: datafusion/tests/sql.rs
##########
@@ -807,17 +807,20 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
     avg(c3) over (), \
     count(c3) over (), \
     max(c3) over (), \
-    min(c3) over () \
+    min(c3) over (), \
+    first_value(c3) over (), \
+    last_value(c3) over (), \
+    nth_value(c3, 2) over ()
     from aggregate_test_100 \
-    order by c2 \
+    order by c2
     limit 5";
     let actual = execute(&mut ctx, sql).await;
     let expected = vec![
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
-        vec!["1", "781", "7.81", "100", "125", "-117"],
+        vec!["1", "781", "7.81", "100", "125", "-117", "1", "30", "-40"],

Review comment:
       I am not sure about these results. "first_value", "last_value" and "nth_value" over an unsorted window (as this query is is using) seems undefined to me. 
   
   For example, Using `datafusion-cli`:
   
   ```
       CREATE EXTERNAL TABLE aggregate_test_100 (
           c1  VARCHAR NOT NULL,
           c2  INT NOT NULL,
           c3  SMALLINT NOT NULL,
           c4  SMALLINT NOT NULL,
           c5  INT NOT NULL,
           c6  BIGINT NOT NULL,
           c7  SMALLINT NOT NULL,
           c8  INT NOT NULL,
           c9  BIGINT NOT NULL,
           c10 VARCHAR NOT NULL,
           c11 FLOAT NOT NULL,
           c12 DOUBLE NOT NULL,
           c13 VARCHAR NOT NULL
       )
       STORED AS CSV
       WITH HEADER ROW
       LOCATION '/Users/alamb/Software/arrow-datafusion/testing/data/csv/';
   ```
   You can see there are many values for `c3`, and the values of `first_value`, `last_value` and `nth_value` seem to be picking some arbitrary rows: 
   
   ```
   > select c2, c3 from aggregate_test_100 order by c2;
   +----+------+
   | c2 | c3   |
   +----+------+
   | 1  | 29   |
   | 1  | -85  |
   | 1  | 38   |
   | 1  | 57   |
   | 1  | 54   |
   | 1  | 103  |
   | 1  | -98  |
   | 1  | -99  |
   | 1  | -25  |
   | 1  | 36   |
   | 1  | 41   |
   | 1  | -8   |
   | 1  | -24  |
   | 1  | 125  |
   | 1  | 70   |
   | 1  | -72  |
   | 1  | 71   |
   | 1  | -56  |
   | 1  | -5   |
   | 1  | 12   |
   | 1  | 83   |
   | 1  | 120  |
   | 2  | 1    |
   | 2  | 113  |
   | 2  | 49   |
   | 2  | 97   |
   | 2  | -29  |
   | 2  | 45   |
   | 2  | -60  |
   | 2  | 93   |
   | 2  | 63   |
   | 2  | 52   |
   | 2  | 31   |
   | 2  | -106 |
   | 2  | -60  |
   | 2  | 68   |
   | 2  | -61  |
   | 2  | 122  |
   | 2  | -48  |
   | 2  | 52   |
   | 2  | -117 |
   | 2  | 29   |
   | 2  | -107 |
   | 2  | -43  |
   | 3  | 104  |
   | 3  | 13   |
   | 3  | 112  |
   | 3  | 77   |
   | 3  | 17   |
   | 3  | 13   |
   | 3  | 73   |
   | 3  | -2   |
   | 3  | 22   |
   | 3  | 17   |
   | 3  | -76  |
   | 3  | 71   |
   | 3  | 14   |
   | 3  | -12  |
   | 3  | -72  |
   | 3  | 97   |
   | 3  | -101 |
   | 3  | -95  |
   | 3  | 123  |
   | 4  | -111 |
   | 4  | -38  |
   | 4  | -54  |
   | 4  | -56  |
   | 4  | -53  |
   | 4  | 123  |
   | 4  | 97   |
   | 4  | 102  |
   | 4  | 65   |
   | 4  | 17   |
   | 4  | 55   |
   | 4  | 73   |
   | 4  | -117 |
   | 4  | -101 |
   | 4  | -79  |
   | 4  | 74   |
   | 4  | 96   |
   | 4  | -90  |
   | 4  | -59  |
   | 4  | 3    |
   | 4  | 5    |
   | 4  | 47   |
   | 4  | 30   |
   | 5  | -40  |
   | 5  | -82  |
   | 5  | 36   |
   | 5  | -31  |
   | 5  | -5   |
   | 5  | 68   |
   | 5  | -59  |
   | 5  | 62   |
   | 5  | -94  |
   | 5  | 64   |
   | 5  | -86  |
   | 5  | 118  |
   | 5  | -101 |
   | 5  | -44  |
   +----+------+
   ```
   
   It is not clear that `1`, `30` and `-40` are the "right" answers (there is no good answer for this dataset)

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -484,45 +515,106 @@ impl RecordBatchStream for WindowAggStream {
 
 #[cfg(test)]
 mod tests {
-    // use super::*;
-
-    // /// some mock data to test windows
-    // fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
-    //     // define a schema.
-    //     let schema = Arc::new(Schema::new(vec![
-    //         Field::new("a", DataType::UInt32, false),
-    //         Field::new("b", DataType::Float64, false),
-    //     ]));
-
-    //     // define data.
-    //     (
-    //         schema.clone(),
-    //         vec![
-    //             RecordBatch::try_new(
-    //                 schema.clone(),
-    //                 vec![
-    //                     Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
-    //                     Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
-    //                 ],
-    //             )
-    //             .unwrap(),
-    //             RecordBatch::try_new(
-    //                 schema,
-    //                 vec![
-    //                     Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
-    //                     Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
-    //                 ],
-    //             )
-    //             .unwrap(),
-    //         ],
-    //     )
-    // }
-
-    // #[tokio::test]
-    // async fn window_function() -> Result<()> {
-    //     let input: Arc<dyn ExecutionPlan> = unimplemented!();
-    //     let input_schema = input.schema();
-    //     let window_expr = vec![];
-    //     WindowAggExec::try_new(window_expr, input, input_schema);
-    // }
+    use super::*;
+    use crate::physical_plan::aggregates::AggregateFunction;
+    use crate::physical_plan::collect;
+    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
+    use crate::physical_plan::expressions::col;
+    use crate::test;
+    use arrow::array::*;
+
+    fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, SchemaRef)> {
+        let schema = test::aggr_test_schema();
+        let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
+        let csv = CsvExec::try_new(
+            &path,
+            CsvReadOptions::new().schema(&schema),
+            None,
+            1024,
+            None,
+        )?;
+
+        let input = Arc::new(csv);
+        Ok((input, schema))
+    }
+
+    #[tokio::test]
+    async fn window_function_input_partition() -> Result<()> {
+        let (input, schema) = create_test_schema(4)?;
+
+        let window_exec = Arc::new(WindowAggExec::try_new(
+            vec![create_window_expr(
+                &WindowFunction::AggregateFunction(AggregateFunction::Count),
+                &[col("c3")],
+                schema.as_ref(),
+                "count".to_owned(),
+            )?],
+            input,
+            schema.clone(),
+        )?);
+
+        let result = collect(window_exec).await;
+
+        assert!(result.is_err());
+        if let Some(DataFusionError::Internal(msg)) = result.err() {
+            assert_eq!(
+                msg,
+                "WindowAggExec requires a single input partition".to_owned()
+            );
+        } else {
+            unreachable!("Expect an internal error to happen");
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn window_function() -> Result<()> {
+        let (input, schema) = create_test_schema(1)?;

Review comment:
       I think it would be valuable to create a test with more than one partition (so that the data may not arrive to the `WindowFunction` as a single `RecordBatch`)

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
+        if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            // for last_value function
+            self.value = values[0].clone();

Review comment:
       Should this be `values.last()` rather than the first (`0`th) value?

##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Are we guaranteed that `scan()` will see the entire window in a single call? Or would it be possible to see `scan()` called for two different slices?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r641481820



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {

Review comment:
       Future performance improvement idea: rather than converting each row to `ScalarValue` and passing `values: &[ScalarValue]`  we should strive to *slice* into the original arrays (i.e. don't do a lot of work / don't copy data) and produce arrays based on offsets and then use `take` to build new arrays out of all of the indices.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#issuecomment-850188619


   > > Thank you @Jimexist  -- the  code structure looks clean to me . Nice work
   > > However, I am concerned about the correctness of these results. As I understand it, `first_value`, `last_value` and `nth_value` are not well defined unless there is an ordering on the window (aka without an ordering you basically can get some arbitrary value from the window).
   > > I wonder if it would make sense to implement ordering for windows first, so we can write tests will well defined output
   > > I also see some change to the `parquet-testing` module which I wonder if that was intended
   > 
   > I guess it's not arbitrary but rather just take the ordering as is. When #425 is merged I'll add one test case to compare with psql so the behavior is consistent.
   > 
   > Of course when ordering clause is implemented then the behavior can also be tested in the same way, along with some unit test.
   > 
   > I plan to implement ordering after this and the lead/lag PR are merged because it requires some structural changes to the planner.
   
   I think the implementation might take the order as is, however the order as given by the underlying plan might give different result. For example, a table scan might give the results in a different order each time it runs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #403: add `first_value`, `last_value`, and `nth_value` built-in window functions

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #403:
URL: https://github.com/apache/arrow-datafusion/pull/403#discussion_r642104113



##########
File path: datafusion/src/physical_plan/expressions/nth_value.rs
##########
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+/// first_value expression
+#[derive(Debug)]
+pub struct FirstValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for FirstValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            1,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+// sql values start with 1, so we can use 0 to indicate the special last value behavior
+const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;
+
+/// last_value expression
+#[derive(Debug)]
+pub struct LastValue {
+    name: String,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+    /// Create a new FIRST_VALUE window aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
+        Self {
+            name,
+            data_type,
+            expr,
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for LastValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            SPECIAL_SIZE_VALUE_FOR_LAST,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+/// nth_value expression
+#[derive(Debug)]
+pub struct NthValue {
+    name: String,
+    n: u32,
+    data_type: DataType,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl NthValue {
+    /// Create a new NTH_VALUE window aggregate function
+    pub fn try_new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: String,
+        n: u32,
+        data_type: DataType,
+    ) -> Result<Self> {
+        if n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            Err(DataFusionError::Execution(
+                "nth_value expect n to be > 0".to_owned(),
+            ))
+        } else {
+            Ok(Self {
+                name,
+                n,
+                data_type,
+                expr,
+            })
+        }
+    }
+}
+
+impl BuiltInWindowFunctionExpr for NthValue {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        let nullable = true;
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
+        Ok(Box::new(NthValueAccumulator::try_new(
+            self.n,
+            self.data_type.clone(),
+        )?))
+    }
+}
+
+#[derive(Debug)]
+struct NthValueAccumulator {
+    // n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
+    // means last; also note that it is totally valid for n to be larger than the number of rows input
+    // in which case all the values shall be null
+    n: u32,
+    offset: u32,
+    value: ScalarValue,
+}
+
+impl NthValueAccumulator {
+    /// new count accumulator
+    pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
+        Ok(Self {
+            n,
+            offset: 0,
+            // null value of that data_type by default
+            value: ScalarValue::try_from(&data_type)?,
+        })
+    }
+}
+
+impl WindowAccumulator for NthValueAccumulator {
+    fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
+        if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
+            // for last_value function
+            self.value = values[0].clone();

Review comment:
       here the values array is per row




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org