You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/20 13:51:12 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #334: Add window expression part 1 - logical and physical planning, structure, to/from proto, and explain, for empty over clause only

alamb commented on a change in pull request #334:
URL: https://github.com/apache/arrow-datafusion/pull/334#discussion_r636098205



##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -289,6 +288,37 @@ impl LogicalPlanBuilder {
         }))
     }
 
+    /// Apply a window
+    pub fn window(

Review comment:
       ```suggestion
       /// 
       /// NOTE: this feature is under development and this API will be changing 
       pub fn window(
   ```

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Window functions provide the ability to perform calculations across
+//! sets of rows that are related to the current query row.
+//!
+//! see also https://www.postgresql.org/docs/current/functions-window.html
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    aggregates, aggregates::AggregateFunction, functions::Signature,
+    type_coercion::data_types,
+};
+use arrow::datatypes::DataType;
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        if let Ok(aggregate) = AggregateFunction::from_str(name) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name) {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no built-in function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BuiltInWindowFunction {
+    /// row number
+    RowNumber,
+    /// rank
+    Rank,
+    /// dense rank
+    DenseRank,
+    /// lag
+    Lag,
+    /// lead
+    Lead,
+    /// first value
+    FirstValue,
+    /// last value
+    LastValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {

Review comment:
       I wonder if this should be a case insensitive comparison (aka I think `RANK` should also match to `BuiltInWindowFunction::Rank`)? I can't remember if that is handled higher up in the planner or not -- it may be worth a test in any case.
   
   

##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -289,6 +288,37 @@ impl LogicalPlanBuilder {
         }))
     }
 
+    /// Apply a window
+    pub fn window(
+        &self,
+        window_expr: impl IntoIterator<Item = Expr>,
+        // partition_by_expr: impl IntoIterator<Item = Expr>,
+        // order_by_expr: impl IntoIterator<Item = Expr>,
+        // window_frame: Option<WindowFrame>,

Review comment:
       I think SQL also offers a "FILTER" clause for window aggregates as well, so we may want to leave room for that in the API
   
   ```suggestion
           // filter: impl IntoIterator<Item = Expr>,
           // partition_by_expr: impl IntoIterator<Item = Expr>,
           // order_by_expr: impl IntoIterator<Item = Expr>,
           // window_frame: Option<WindowFrame>,
   ```
   This page https://sqlite.org/windowfunctions.html does a pretty good job with the syntax

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2641,13 +2701,23 @@ mod tests {
     }
 
     #[test]
-    fn over_not_supported() {
+    fn empty_over() {
         let sql = "SELECT order_id, MAX(order_id) OVER () from orders";
-        let err = logical_plan(sql).expect_err("query should have failed");
-        assert_eq!(
-            "NotImplemented(\"Unsupported OVER clause ()\")",
-            format!("{:?}", err)
-        );
+        let expected = "\
+        Projection: #order_id, #MAX(order_id)\
+        \n  WindowAggr: windowExpr=[[MAX(#order_id)]] partitionBy=[], orderBy=[]\
+        \n    TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
+    fn empty_over_plus() {

Review comment:
       I would like to see some additional tests to increase the coverage:
   1. Multiple window functions in the same query (as the frontend code supports this)
   2. Built in window functions (e.g `ROW_NUMBER()` and `row_number()` and `lead(col)`)
   3. A test in `tests/sql.rs` that runs (and throw as "NotYetImplemented") error for queries that have an `OVER()` clause (to exercise the physical planning logic)

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Window functions provide the ability to perform calculations across
+//! sets of rows that are related to the current query row.
+//!
+//! see also https://www.postgresql.org/docs/current/functions-window.html
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    aggregates, aggregates::AggregateFunction, functions::Signature,
+    type_coercion::data_types,
+};
+use arrow::datatypes::DataType;
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        if let Ok(aggregate) = AggregateFunction::from_str(name) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name) {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no built-in function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BuiltInWindowFunction {
+    /// row number
+    RowNumber,
+    /// rank
+    Rank,
+    /// dense rank
+    DenseRank,
+    /// lag
+    Lag,
+    /// lead
+    Lead,
+    /// first value
+    FirstValue,
+    /// last value
+    LastValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
+        Ok(match name {
+            "row_number" => BuiltInWindowFunction::RowNumber,
+            "rank" => BuiltInWindowFunction::Rank,
+            "dense_rank" => BuiltInWindowFunction::DenseRank,
+            "first_value" => BuiltInWindowFunction::FirstValue,
+            "last_value" => BuiltInWindowFunction::LastValue,
+            "lag" => BuiltInWindowFunction::Lag,
+            "lead" => BuiltInWindowFunction::Lead,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in function named {}",

Review comment:
       ```suggestion
                       "There is no built-in window function named {}",
   ```

##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -289,6 +288,37 @@ impl LogicalPlanBuilder {
         }))
     }
 
+    /// Apply a window
+    pub fn window(
+        &self,
+        window_expr: impl IntoIterator<Item = Expr>,
+        // partition_by_expr: impl IntoIterator<Item = Expr>,
+        // order_by_expr: impl IntoIterator<Item = Expr>,
+        // window_frame: Option<WindowFrame>,
+    ) -> Result<Self> {
+        let window_expr = window_expr.into_iter().collect::<Vec<Expr>>();
+        // let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
+        // let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
+        let all_expr = window_expr.iter();
+        validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;
+
+        let mut window_fields: Vec<DFField> =
+            exprlist_to_fields(all_expr, self.plan.schema())?;
+        window_fields.extend_from_slice(self.plan.schema().fields());
+
+        Ok(Self::from(&LogicalPlan::Window {
+            input: Arc::new(self.plan.clone()),
+            // FIXME implement next
+            // partition_by_expr,
+            // FIXME implement next
+            // order_by_expr,
+            // FIXME implement next
+            // window_frame,
+            window_expr,

Review comment:
       likewise here it may make sense to have a filter clause (FIXME) too
   

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Window functions provide the ability to perform calculations across
+//! sets of rows that are related to the current query row.
+//!
+//! see also https://www.postgresql.org/docs/current/functions-window.html
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    aggregates, aggregates::AggregateFunction, functions::Signature,
+    type_coercion::data_types,
+};
+use arrow::datatypes::DataType;
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        if let Ok(aggregate) = AggregateFunction::from_str(name) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name) {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no built-in function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BuiltInWindowFunction {
+    /// row number
+    RowNumber,
+    /// rank
+    Rank,
+    /// dense rank
+    DenseRank,
+    /// lag
+    Lag,
+    /// lead
+    Lead,
+    /// first value
+    FirstValue,
+    /// last value
+    LastValue,
+}

Review comment:
       Would it be possible to include all the built in window functions (and generate errors for those that are not supported)?
   
   Comparing this list and https://www.postgresql.org/docs/9.1/functions-window.html it seems like `percent_rank`, `cume_dist`, `ntile`, `nth_value` are not present.
   
   I am not suggesting you need to implement those functions, merely add space for them in the enum. Once you have set the pattern of how to implement such functions, those additional functions would make for good "first time contributor" type tickets for people who want to help out. 

##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -151,6 +153,25 @@ message AggregateExprNode {
   LogicalExprNode expr = 2;
 }
 
+enum BuiltInWindowFunction {
+  ROW_NUMBER = 0;
+  RANK = 1;
+  DENSE_RANK = 2;
+  LAG = 3;
+  LEAD = 4;
+  FIRST_VALUE = 5;
+  LAST_VALUE = 6;
+}
+
+message WindowExprNode {
+  oneof window_function {
+    AggregateFunction aggr_function = 1;

Review comment:
       I think SQL is confusing in this area -- as @Jimexist  says, all "normal" aggregate functions (e.g. sum, count, etc) are also valid window functions, but the reverse is not true. You can't use window functions (e.g. LAG, LEAD, etc) outside of a window clause.
   
   Thus I think representing window functions as a new type of function, as this PR does, makes the most sense. They are different enough (e.g. require information on the incoming windows) that trying to wrangle them into the same structures as normal aggregates seems like it will get messy. Long term I would expect we have a UDWF (user defined window function) api as well.
   
   Ideally the physical implementation for `sum` / `count` / etc can be mostly reused but in the plans I think they are different enough to warrant different plan structures. 

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2641,13 +2701,23 @@ mod tests {
     }
 
     #[test]
-    fn over_not_supported() {
+    fn empty_over() {

Review comment:
       👍  Great

##########
File path: datafusion/src/physical_plan/windows.rs
##########
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for window functions
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    aggregates, window_functions::WindowFunction, AggregateExpr, Distribution,
+    ExecutionPlan, Partitioning, PhysicalExpr, SendableRecordBatchStream, WindowExpr,
+};
+use arrow::datatypes::{Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Window execution plan
+#[derive(Debug)]
+pub struct WindowAggExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Window function expression
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    /// Schema after the window is run
+    schema: SchemaRef,
+    /// Schema before the window
+    input_schema: SchemaRef,
+}
+
+/// Create a physical expression for window function
+pub fn create_window_expr(
+    fun: &WindowFunction,
+    args: &[Arc<dyn PhysicalExpr>],
+    input_schema: &Schema,
+    name: String,
+) -> Result<Arc<dyn WindowExpr>> {
+    match fun {
+        WindowFunction::AggregateFunction(fun) => Ok(Arc::new(AggregateWindowExpr {
+            aggregate: aggregates::create_aggregate_expr(
+                fun,
+                false,
+                args,
+                input_schema,
+                name,
+            )?,
+        })),
+        WindowFunction::BuiltInWindowFunction(fun) => {
+            Err(DataFusionError::NotImplemented(format!(
+                "window funtion with {:?} not implemented",
+                fun
+            )))
+        }
+    }
+}
+
+/// A window expr that takes the form of a built in window function
+#[derive(Debug)]
+pub struct BuiltInWindowExpr {}
+
+/// A window expr that takes the form of an aggregate function
+#[derive(Debug)]
+pub struct AggregateWindowExpr {
+    aggregate: Arc<dyn AggregateExpr>,
+}
+
+impl WindowExpr for AggregateWindowExpr {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        &self.aggregate.name()
+    }
+
+    fn field(&self) -> Result<Field> {
+        self.aggregate.field()
+    }
+}
+
+fn create_schema(
+    input_schema: &Schema,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Schema> {
+    let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
+    for expr in window_expr {
+        fields.push(expr.field()?);
+    }
+    fields.extend_from_slice(input_schema.fields());
+    Ok(Schema::new(fields))
+}
+
+impl WindowAggExec {
+    /// Create a new execution plan for window aggregates
+    pub fn try_new(
+        window_expr: Vec<Arc<dyn WindowExpr>>,
+        input: Arc<dyn ExecutionPlan>,
+        input_schema: SchemaRef,
+    ) -> Result<Self> {
+        let schema = create_schema(&input.schema(), &window_expr)?;
+        let schema = Arc::new(schema);
+        Ok(WindowAggExec {
+            input,
+            window_expr,
+            schema,
+            input_schema,
+        })
+    }
+
+    /// Input plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Get the input schema before any aggregates are applied
+    pub fn input_schema(&self) -> SchemaRef {
+        self.input_schema.clone()
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for WindowAggExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::SinglePartition
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(WindowAggExec::try_new(
+                self.window_expr.clone(),
+                children[0].clone(),
+                children[0].schema(),
+            )?)),
+            _ => Err(DataFusionError::Internal(
+                "WindowAggExec wrong number of children".to_owned(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "WindowAggExec invalid partition {}",
+                partition
+            )));
+        }
+
+        // window needs to operate on a single partition currently

Review comment:
       👍 
   
   Eventually it would be cool to push the partitioning expressions into a `RepartitionExec` so that we can execute the window functions in parallel on different windows but that is definitely an optimization for the future (not this initial PR) :)

##########
File path: datafusion/src/physical_plan/window_functions.rs
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Window functions provide the ability to perform calculations across
+//! sets of rows that are related to the current query row.
+//!
+//! see also https://www.postgresql.org/docs/current/functions-window.html
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    aggregates, aggregates::AggregateFunction, functions::Signature,
+    type_coercion::data_types,
+};
+use arrow::datatypes::DataType;
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        if let Ok(aggregate) = AggregateFunction::from_str(name) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name) {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no built-in function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum BuiltInWindowFunction {
+    /// row number
+    RowNumber,
+    /// rank
+    Rank,
+    /// dense rank
+    DenseRank,
+    /// lag
+    Lag,
+    /// lead
+    Lead,
+    /// first value
+    FirstValue,
+    /// last value
+    LastValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
+        Ok(match name {
+            "row_number" => BuiltInWindowFunction::RowNumber,
+            "rank" => BuiltInWindowFunction::Rank,
+            "dense_rank" => BuiltInWindowFunction::DenseRank,
+            "first_value" => BuiltInWindowFunction::FirstValue,
+            "last_value" => BuiltInWindowFunction::LastValue,
+            "lag" => BuiltInWindowFunction::Lag,
+            "lead" => BuiltInWindowFunction::Lead,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in function named {}",
+                    name
+                )))
+            }
+        })
+    }
+}
+
+/// Returns the datatype of the scalar function

Review comment:
       ```suggestion
   /// Returns the datatype of the window function
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org