You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 13:35:41 UTC

[arrow-datafusion] 01/01: split expr type and null info to be expr-schemable

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch expr-schemable
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit fb710545e00cfa97cb166bc32aec487a9df2d41c
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 21:35:20 2022 +0800

    split expr type and null info to be expr-schemable
---
 datafusion/src/logical_plan/builder.rs             |   1 +
 datafusion/src/logical_plan/expr.rs                | 152 +----------------
 datafusion/src/logical_plan/expr_schema.rs         | 180 +++++++++++++++++++++
 datafusion/src/logical_plan/mod.rs                 |   2 +
 .../src/optimizer/common_subexpr_eliminate.rs      |   2 +-
 datafusion/src/optimizer/simplify_expressions.rs   |   8 +-
 datafusion/tests/simplification.rs                 |   1 +
 7 files changed, 191 insertions(+), 155 deletions(-)

diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index d81fa9d..a722238 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -25,6 +25,7 @@ use crate::datasource::{
     MemTable, TableProvider,
 };
 use crate::error::{DataFusionError, Result};
+use crate::logical_plan::expr_schema::ExprSchemable;
 use crate::logical_plan::plan::{
     Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
     TableScan, ToStringifiedPlan, Union, Window,
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 69da346..0c1fac4 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -20,13 +20,10 @@
 
 pub use super::Operator;
 use crate::error::{DataFusionError, Result};
-use crate::field_util::get_indexed_field;
+use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{window_frames, DFField, DFSchema};
 use crate::physical_plan::functions::Volatility;
-use crate::physical_plan::{
-    aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
-    window_functions,
-};
+use crate::physical_plan::{aggregates, functions, udf::ScalarUDF, window_functions};
 use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
 use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
 use arrow::{compute::can_cast_types, datatypes::DataType};
@@ -251,151 +248,6 @@ impl PartialOrd for Expr {
 }
 
 impl Expr {
-    /// Returns the [arrow::datatypes::DataType] of the expression
-    /// based on [ExprSchema]
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// [arrow::datatypes::DataType].  This happens when e.g. the
-    /// expression refers to a column that does not exist in the
-    /// schema, or when the expression is incorrectly typed
-    /// (e.g. `[utf8] + [bool]`).
-    pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
-        match self {
-            Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
-                expr.get_type(schema)
-            }
-            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
-            Expr::ScalarVariable(_) => Ok(DataType::Utf8),
-            Expr::Literal(l) => Ok(l.get_datatype()),
-            Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
-            Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
-                Ok(data_type.clone())
-            }
-            Expr::ScalarUDF { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::ScalarFunction { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                functions::return_type(fun, &data_types)
-            }
-            Expr::WindowFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                window_functions::return_type(fun, &data_types)
-            }
-            Expr::AggregateFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                aggregates::return_type(fun, &data_types)
-            }
-            Expr::AggregateUDF { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::Not(_)
-            | Expr::IsNull(_)
-            | Expr::Between { .. }
-            | Expr::InList { .. }
-            | Expr::IsNotNull(_) => Ok(DataType::Boolean),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ref op,
-            } => binary_operator_data_type(
-                &left.get_type(schema)?,
-                op,
-                &right.get_type(schema)?,
-            ),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(schema)?;
-
-                get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
-            }
-        }
-    }
-
-    /// Returns the nullability of the expression based on [ExprSchema].
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// nullability.  This happens when the expression refers to a
-    /// column that does not exist in the schema.
-    pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
-        match self {
-            Expr::Alias(expr, _)
-            | Expr::Not(expr)
-            | Expr::Negative(expr)
-            | Expr::Sort { expr, .. }
-            | Expr::Between { expr, .. }
-            | Expr::InList { expr, .. } => expr.nullable(input_schema),
-            Expr::Column(c) => input_schema.nullable(c),
-            Expr::Literal(value) => Ok(value.is_null()),
-            Expr::Case {
-                when_then_expr,
-                else_expr,
-                ..
-            } => {
-                // this expression is nullable if any of the input expressions are nullable
-                let then_nullable = when_then_expr
-                    .iter()
-                    .map(|(_, t)| t.nullable(input_schema))
-                    .collect::<Result<Vec<_>>>()?;
-                if then_nullable.contains(&true) {
-                    Ok(true)
-                } else if let Some(e) = else_expr {
-                    e.nullable(input_schema)
-                } else {
-                    Ok(false)
-                }
-            }
-            Expr::Cast { expr, .. } => expr.nullable(input_schema),
-            Expr::ScalarVariable(_)
-            | Expr::TryCast { .. }
-            | Expr::ScalarFunction { .. }
-            | Expr::ScalarUDF { .. }
-            | Expr::WindowFunction { .. }
-            | Expr::AggregateFunction { .. }
-            | Expr::AggregateUDF { .. } => Ok(true),
-            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ..
-            } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(input_schema)?;
-                get_indexed_field(&data_type, key).map(|x| x.is_nullable())
-            }
-        }
-    }
-
     /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
     ///
     /// This represents how a column with this expression is named when no alias is chosen
diff --git a/datafusion/src/logical_plan/expr_schema.rs b/datafusion/src/logical_plan/expr_schema.rs
new file mode 100644
index 0000000..5c128db
--- /dev/null
+++ b/datafusion/src/logical_plan/expr_schema.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::Expr;
+use crate::field_util::get_indexed_field;
+use crate::physical_plan::{
+    aggregates, expressions::binary_operator_data_type, functions, window_functions,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::{DataFusionError, ExprSchema, Result};
+
+/// trait to allow expr to typable with respect to a schema
+pub trait ExprSchemable {
+    /// given a schema, return the type of the expr
+    fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType>;
+
+    /// given a schema, return the nullability of the expr
+    fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool>;
+}
+
+impl ExprSchemable for Expr {
+    /// Returns the [arrow::datatypes::DataType] of the expression
+    /// based on [ExprSchema]
+    ///
+    /// Note: [DFSchema] implements [ExprSchema].
+    ///
+    /// # Errors
+    ///
+    /// This function errors when it is not possible to compute its
+    /// [arrow::datatypes::DataType].  This happens when e.g. the
+    /// expression refers to a column that does not exist in the
+    /// schema, or when the expression is incorrectly typed
+    /// (e.g. `[utf8] + [bool]`).
+    fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
+        match self {
+            Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
+                expr.get_type(schema)
+            }
+            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
+            Expr::ScalarVariable(_) => Ok(DataType::Utf8),
+            Expr::Literal(l) => Ok(l.get_datatype()),
+            Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
+            Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
+                Ok(data_type.clone())
+            }
+            Expr::ScalarUDF { fun, args } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+            }
+            Expr::ScalarFunction { fun, args } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                functions::return_type(fun, &data_types)
+            }
+            Expr::WindowFunction { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                window_functions::return_type(fun, &data_types)
+            }
+            Expr::AggregateFunction { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                aggregates::return_type(fun, &data_types)
+            }
+            Expr::AggregateUDF { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+            }
+            Expr::Not(_)
+            | Expr::IsNull(_)
+            | Expr::Between { .. }
+            | Expr::InList { .. }
+            | Expr::IsNotNull(_) => Ok(DataType::Boolean),
+            Expr::BinaryExpr {
+                ref left,
+                ref right,
+                ref op,
+            } => binary_operator_data_type(
+                &left.get_type(schema)?,
+                op,
+                &right.get_type(schema)?,
+            ),
+            Expr::Wildcard => Err(DataFusionError::Internal(
+                "Wildcard expressions are not valid in a logical query plan".to_owned(),
+            )),
+            Expr::GetIndexedField { ref expr, key } => {
+                let data_type = expr.get_type(schema)?;
+
+                get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
+            }
+        }
+    }
+
+    /// Returns the nullability of the expression based on [ExprSchema].
+    ///
+    /// Note: [DFSchema] implements [ExprSchema].
+    ///
+    /// # Errors
+    ///
+    /// This function errors when it is not possible to compute its
+    /// nullability.  This happens when the expression refers to a
+    /// column that does not exist in the schema.
+    fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
+        match self {
+            Expr::Alias(expr, _)
+            | Expr::Not(expr)
+            | Expr::Negative(expr)
+            | Expr::Sort { expr, .. }
+            | Expr::Between { expr, .. }
+            | Expr::InList { expr, .. } => expr.nullable(input_schema),
+            Expr::Column(c) => input_schema.nullable(c),
+            Expr::Literal(value) => Ok(value.is_null()),
+            Expr::Case {
+                when_then_expr,
+                else_expr,
+                ..
+            } => {
+                // this expression is nullable if any of the input expressions are nullable
+                let then_nullable = when_then_expr
+                    .iter()
+                    .map(|(_, t)| t.nullable(input_schema))
+                    .collect::<Result<Vec<_>>>()?;
+                if then_nullable.contains(&true) {
+                    Ok(true)
+                } else if let Some(e) = else_expr {
+                    e.nullable(input_schema)
+                } else {
+                    Ok(false)
+                }
+            }
+            Expr::Cast { expr, .. } => expr.nullable(input_schema),
+            Expr::ScalarVariable(_)
+            | Expr::TryCast { .. }
+            | Expr::ScalarFunction { .. }
+            | Expr::ScalarUDF { .. }
+            | Expr::WindowFunction { .. }
+            | Expr::AggregateFunction { .. }
+            | Expr::AggregateUDF { .. } => Ok(true),
+            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+            Expr::BinaryExpr {
+                ref left,
+                ref right,
+                ..
+            } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
+            Expr::Wildcard => Err(DataFusionError::Internal(
+                "Wildcard expressions are not valid in a logical query plan".to_owned(),
+            )),
+            Expr::GetIndexedField { ref expr, key } => {
+                let data_type = expr.get_type(input_schema)?;
+                get_indexed_field(&data_type, key).map(|x| x.is_nullable())
+            }
+        }
+    }
+}
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 085775a..f2ecb0f 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -26,6 +26,7 @@ mod dfschema;
 mod display;
 mod expr;
 mod expr_rewriter;
+mod expr_schema;
 mod expr_simplier;
 mod expr_visitor;
 mod extension;
@@ -54,6 +55,7 @@ pub use expr_rewriter::{
     normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
     unnormalize_col, unnormalize_cols, ExprRewritable, ExprRewriter, RewriteRecursion,
 };
+pub use expr_schema::ExprSchemable;
 pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
 pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 pub use extension::UserDefinedLogicalNode;
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 5c2219b..2ed45be 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -23,7 +23,7 @@ use crate::logical_plan::plan::{Filter, Projection, Window};
 use crate::logical_plan::{
     col,
     plan::{Aggregate, Sort},
-    DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprVisitable,
+    DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
     ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
 };
 use crate::optimizer::optimizer::OptimizerRule;
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index f8f3df4..4e9709b 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -17,12 +17,9 @@
 
 //! Simplify expressions optimizer rule
 
-use arrow::array::new_null_array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
-
 use crate::error::DataFusionError;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{
     lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable,
     LogicalPlan, RewriteRecursion, SimplifyInfo,
@@ -33,6 +30,9 @@ use crate::physical_plan::functions::Volatility;
 use crate::physical_plan::planner::create_physical_expr;
 use crate::scalar::ScalarValue;
 use crate::{error::Result, logical_plan::Operator};
+use arrow::array::new_null_array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
 
 /// Provides simplification information based on schema and properties
 struct SimplifyContext<'a, 'b> {
diff --git a/datafusion/tests/simplification.rs b/datafusion/tests/simplification.rs
index 0ce8e76..fe5f5e2 100644
--- a/datafusion/tests/simplification.rs
+++ b/datafusion/tests/simplification.rs
@@ -18,6 +18,7 @@
 //! This program demonstrates the DataFusion expression simplification API.
 
 use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::logical_plan::ExprSchemable;
 use datafusion::logical_plan::ExprSimplifiable;
 use datafusion::{
     error::Result,