You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 13:35:41 UTC
[arrow-datafusion] 01/01: split expr type and null info to be expr-schemable
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch expr-schemable
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit fb710545e00cfa97cb166bc32aec487a9df2d41c
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 21:35:20 2022 +0800
split expr type and null info to be expr-schemable
---
datafusion/src/logical_plan/builder.rs | 1 +
datafusion/src/logical_plan/expr.rs | 152 +----------------
datafusion/src/logical_plan/expr_schema.rs | 180 +++++++++++++++++++++
datafusion/src/logical_plan/mod.rs | 2 +
.../src/optimizer/common_subexpr_eliminate.rs | 2 +-
datafusion/src/optimizer/simplify_expressions.rs | 8 +-
datafusion/tests/simplification.rs | 1 +
7 files changed, 191 insertions(+), 155 deletions(-)
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index d81fa9d..a722238 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -25,6 +25,7 @@ use crate::datasource::{
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
+use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::{
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
TableScan, ToStringifiedPlan, Union, Window,
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 69da346..0c1fac4 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -20,13 +20,10 @@
pub use super::Operator;
use crate::error::{DataFusionError, Result};
-use crate::field_util::get_indexed_field;
+use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{window_frames, DFField, DFSchema};
use crate::physical_plan::functions::Volatility;
-use crate::physical_plan::{
- aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
- window_functions,
-};
+use crate::physical_plan::{aggregates, functions, udf::ScalarUDF, window_functions};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
@@ -251,151 +248,6 @@ impl PartialOrd for Expr {
}
impl Expr {
- /// Returns the [arrow::datatypes::DataType] of the expression
- /// based on [ExprSchema]
- ///
- /// Note: [DFSchema] implements [ExprSchema].
- ///
- /// # Errors
- ///
- /// This function errors when it is not possible to compute its
- /// [arrow::datatypes::DataType]. This happens when e.g. the
- /// expression refers to a column that does not exist in the
- /// schema, or when the expression is incorrectly typed
- /// (e.g. `[utf8] + [bool]`).
- pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
- match self {
- Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
- expr.get_type(schema)
- }
- Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
- Expr::ScalarVariable(_) => Ok(DataType::Utf8),
- Expr::Literal(l) => Ok(l.get_datatype()),
- Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
- Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
- Ok(data_type.clone())
- }
- Expr::ScalarUDF { fun, args } => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- Ok((fun.return_type)(&data_types)?.as_ref().clone())
- }
- Expr::ScalarFunction { fun, args } => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- functions::return_type(fun, &data_types)
- }
- Expr::WindowFunction { fun, args, .. } => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- window_functions::return_type(fun, &data_types)
- }
- Expr::AggregateFunction { fun, args, .. } => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- aggregates::return_type(fun, &data_types)
- }
- Expr::AggregateUDF { fun, args, .. } => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- Ok((fun.return_type)(&data_types)?.as_ref().clone())
- }
- Expr::Not(_)
- | Expr::IsNull(_)
- | Expr::Between { .. }
- | Expr::InList { .. }
- | Expr::IsNotNull(_) => Ok(DataType::Boolean),
- Expr::BinaryExpr {
- ref left,
- ref right,
- ref op,
- } => binary_operator_data_type(
- &left.get_type(schema)?,
- op,
- &right.get_type(schema)?,
- ),
- Expr::Wildcard => Err(DataFusionError::Internal(
- "Wildcard expressions are not valid in a logical query plan".to_owned(),
- )),
- Expr::GetIndexedField { ref expr, key } => {
- let data_type = expr.get_type(schema)?;
-
- get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
- }
- }
- }
-
- /// Returns the nullability of the expression based on [ExprSchema].
- ///
- /// Note: [DFSchema] implements [ExprSchema].
- ///
- /// # Errors
- ///
- /// This function errors when it is not possible to compute its
- /// nullability. This happens when the expression refers to a
- /// column that does not exist in the schema.
- pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
- match self {
- Expr::Alias(expr, _)
- | Expr::Not(expr)
- | Expr::Negative(expr)
- | Expr::Sort { expr, .. }
- | Expr::Between { expr, .. }
- | Expr::InList { expr, .. } => expr.nullable(input_schema),
- Expr::Column(c) => input_schema.nullable(c),
- Expr::Literal(value) => Ok(value.is_null()),
- Expr::Case {
- when_then_expr,
- else_expr,
- ..
- } => {
- // this expression is nullable if any of the input expressions are nullable
- let then_nullable = when_then_expr
- .iter()
- .map(|(_, t)| t.nullable(input_schema))
- .collect::<Result<Vec<_>>>()?;
- if then_nullable.contains(&true) {
- Ok(true)
- } else if let Some(e) = else_expr {
- e.nullable(input_schema)
- } else {
- Ok(false)
- }
- }
- Expr::Cast { expr, .. } => expr.nullable(input_schema),
- Expr::ScalarVariable(_)
- | Expr::TryCast { .. }
- | Expr::ScalarFunction { .. }
- | Expr::ScalarUDF { .. }
- | Expr::WindowFunction { .. }
- | Expr::AggregateFunction { .. }
- | Expr::AggregateUDF { .. } => Ok(true),
- Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
- Expr::BinaryExpr {
- ref left,
- ref right,
- ..
- } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
- Expr::Wildcard => Err(DataFusionError::Internal(
- "Wildcard expressions are not valid in a logical query plan".to_owned(),
- )),
- Expr::GetIndexedField { ref expr, key } => {
- let data_type = expr.get_type(input_schema)?;
- get_indexed_field(&data_type, key).map(|x| x.is_nullable())
- }
- }
- }
-
/// Returns the name of this expression based on [crate::logical_plan::DFSchema].
///
/// This represents how a column with this expression is named when no alias is chosen
diff --git a/datafusion/src/logical_plan/expr_schema.rs b/datafusion/src/logical_plan/expr_schema.rs
new file mode 100644
index 0000000..5c128db
--- /dev/null
+++ b/datafusion/src/logical_plan/expr_schema.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::Expr;
+use crate::field_util::get_indexed_field;
+use crate::physical_plan::{
+ aggregates, expressions::binary_operator_data_type, functions, window_functions,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::{DataFusionError, ExprSchema, Result};
+
+/// trait to allow expr to typable with respect to a schema
+pub trait ExprSchemable {
+ /// given a schema, return the type of the expr
+ fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType>;
+
+ /// given a schema, return the nullability of the expr
+ fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool>;
+}
+
+impl ExprSchemable for Expr {
+ /// Returns the [arrow::datatypes::DataType] of the expression
+ /// based on [ExprSchema]
+ ///
+ /// Note: [DFSchema] implements [ExprSchema].
+ ///
+ /// # Errors
+ ///
+ /// This function errors when it is not possible to compute its
+ /// [arrow::datatypes::DataType]. This happens when e.g. the
+ /// expression refers to a column that does not exist in the
+ /// schema, or when the expression is incorrectly typed
+ /// (e.g. `[utf8] + [bool]`).
+ fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
+ match self {
+ Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
+ expr.get_type(schema)
+ }
+ Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
+ Expr::ScalarVariable(_) => Ok(DataType::Utf8),
+ Expr::Literal(l) => Ok(l.get_datatype()),
+ Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
+ Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
+ Ok(data_type.clone())
+ }
+ Expr::ScalarUDF { fun, args } => {
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ Ok((fun.return_type)(&data_types)?.as_ref().clone())
+ }
+ Expr::ScalarFunction { fun, args } => {
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ functions::return_type(fun, &data_types)
+ }
+ Expr::WindowFunction { fun, args, .. } => {
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ window_functions::return_type(fun, &data_types)
+ }
+ Expr::AggregateFunction { fun, args, .. } => {
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ aggregates::return_type(fun, &data_types)
+ }
+ Expr::AggregateUDF { fun, args, .. } => {
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ Ok((fun.return_type)(&data_types)?.as_ref().clone())
+ }
+ Expr::Not(_)
+ | Expr::IsNull(_)
+ | Expr::Between { .. }
+ | Expr::InList { .. }
+ | Expr::IsNotNull(_) => Ok(DataType::Boolean),
+ Expr::BinaryExpr {
+ ref left,
+ ref right,
+ ref op,
+ } => binary_operator_data_type(
+ &left.get_type(schema)?,
+ op,
+ &right.get_type(schema)?,
+ ),
+ Expr::Wildcard => Err(DataFusionError::Internal(
+ "Wildcard expressions are not valid in a logical query plan".to_owned(),
+ )),
+ Expr::GetIndexedField { ref expr, key } => {
+ let data_type = expr.get_type(schema)?;
+
+ get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
+ }
+ }
+ }
+
+ /// Returns the nullability of the expression based on [ExprSchema].
+ ///
+ /// Note: [DFSchema] implements [ExprSchema].
+ ///
+ /// # Errors
+ ///
+ /// This function errors when it is not possible to compute its
+ /// nullability. This happens when the expression refers to a
+ /// column that does not exist in the schema.
+ fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
+ match self {
+ Expr::Alias(expr, _)
+ | Expr::Not(expr)
+ | Expr::Negative(expr)
+ | Expr::Sort { expr, .. }
+ | Expr::Between { expr, .. }
+ | Expr::InList { expr, .. } => expr.nullable(input_schema),
+ Expr::Column(c) => input_schema.nullable(c),
+ Expr::Literal(value) => Ok(value.is_null()),
+ Expr::Case {
+ when_then_expr,
+ else_expr,
+ ..
+ } => {
+ // this expression is nullable if any of the input expressions are nullable
+ let then_nullable = when_then_expr
+ .iter()
+ .map(|(_, t)| t.nullable(input_schema))
+ .collect::<Result<Vec<_>>>()?;
+ if then_nullable.contains(&true) {
+ Ok(true)
+ } else if let Some(e) = else_expr {
+ e.nullable(input_schema)
+ } else {
+ Ok(false)
+ }
+ }
+ Expr::Cast { expr, .. } => expr.nullable(input_schema),
+ Expr::ScalarVariable(_)
+ | Expr::TryCast { .. }
+ | Expr::ScalarFunction { .. }
+ | Expr::ScalarUDF { .. }
+ | Expr::WindowFunction { .. }
+ | Expr::AggregateFunction { .. }
+ | Expr::AggregateUDF { .. } => Ok(true),
+ Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+ Expr::BinaryExpr {
+ ref left,
+ ref right,
+ ..
+ } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
+ Expr::Wildcard => Err(DataFusionError::Internal(
+ "Wildcard expressions are not valid in a logical query plan".to_owned(),
+ )),
+ Expr::GetIndexedField { ref expr, key } => {
+ let data_type = expr.get_type(input_schema)?;
+ get_indexed_field(&data_type, key).map(|x| x.is_nullable())
+ }
+ }
+ }
+}
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 085775a..f2ecb0f 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -26,6 +26,7 @@ mod dfschema;
mod display;
mod expr;
mod expr_rewriter;
+mod expr_schema;
mod expr_simplier;
mod expr_visitor;
mod extension;
@@ -54,6 +55,7 @@ pub use expr_rewriter::{
normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
unnormalize_col, unnormalize_cols, ExprRewritable, ExprRewriter, RewriteRecursion,
};
+pub use expr_schema::ExprSchemable;
pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 5c2219b..2ed45be 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -23,7 +23,7 @@ use crate::logical_plan::plan::{Filter, Projection, Window};
use crate::logical_plan::{
col,
plan::{Aggregate, Sort},
- DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprVisitable,
+ DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
use crate::optimizer::optimizer::OptimizerRule;
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index f8f3df4..4e9709b 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -17,12 +17,9 @@
//! Simplify expressions optimizer rule
-use arrow::array::new_null_array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
-
use crate::error::DataFusionError;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{
lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable,
LogicalPlan, RewriteRecursion, SimplifyInfo,
@@ -33,6 +30,9 @@ use crate::physical_plan::functions::Volatility;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};
+use arrow::array::new_null_array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
/// Provides simplification information based on schema and properties
struct SimplifyContext<'a, 'b> {
diff --git a/datafusion/tests/simplification.rs b/datafusion/tests/simplification.rs
index 0ce8e76..fe5f5e2 100644
--- a/datafusion/tests/simplification.rs
+++ b/datafusion/tests/simplification.rs
@@ -18,6 +18,7 @@
//! This program demonstrates the DataFusion expression simplification API.
use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::logical_plan::ExprSchemable;
use datafusion::logical_plan::ExprSimplifiable;
use datafusion::{
error::Result,