You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/22 15:04:47 UTC
[arrow-datafusion] branch main updated: fix: make simplify_expressions use a single schema for resolution (#6077)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0dac1d9f4f fix: make simplify_expressions use a single schema for resolution (#6077)
0dac1d9f4f is described below
commit 0dac1d9f4f942b89ebae22f97aaef1bab825d74f
Author: Christopher M. Wolff <ch...@influxdata.com>
AuthorDate: Sat Apr 22 08:04:41 2023 -0700
fix: make simplify_expressions use a single schema for resolution (#6077)
* fix: make simplify_expressions use a single schema for resolution
* refactor: Use empty schema when no inputs and not scan
* fix: fmt
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/expr/src/logical_plan/builder.rs | 14 ++++-
.../optimizer/src/simplify_expressions/context.rs | 42 ++++++--------
.../src/simplify_expressions/simplify_exprs.rs | 64 ++++++++++++++++++----
3 files changed, 81 insertions(+), 39 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index d72f6b462c..efc16fb0d6 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1267,13 +1267,25 @@ pub fn table_scan<'a>(
name: Option<impl Into<TableReference<'a>>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
+) -> Result<LogicalPlanBuilder> {
+ table_scan_with_filters(name, table_schema, projection, vec![])
+}
+
+/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
+/// and inlined filters.
+/// This is mostly used for testing and documentation.
+pub fn table_scan_with_filters<'a>(
+ name: Option<impl Into<TableReference<'a>>>,
+ table_schema: &Schema,
+ projection: Option<Vec<usize>>,
+ filters: Vec<Expr>,
) -> Result<LogicalPlanBuilder> {
let table_source = table_source(table_schema);
let name = name
.map(|n| n.into())
.unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
.to_owned_reference();
- LogicalPlanBuilder::scan(name, table_source, projection)
+ LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
}
fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
diff --git a/datafusion/optimizer/src/simplify_expressions/context.rs b/datafusion/optimizer/src/simplify_expressions/context.rs
index 0fe1e6ae81..34f3908c7e 100644
--- a/datafusion/optimizer/src/simplify_expressions/context.rs
+++ b/datafusion/optimizer/src/simplify_expressions/context.rs
@@ -76,7 +76,7 @@ pub trait SimplifyInfo {
/// assert_eq!(simplified, col("b").lt(lit(2)));
/// ```
pub struct SimplifyContext<'a> {
- schemas: Vec<DFSchemaRef>,
+ schema: Option<DFSchemaRef>,
props: &'a ExecutionProps,
}
@@ -84,14 +84,14 @@ impl<'a> SimplifyContext<'a> {
/// Create a new SimplifyContext
pub fn new(props: &'a ExecutionProps) -> Self {
Self {
- schemas: vec![],
+ schema: None,
props,
}
}
/// Register a [`DFSchemaRef`] with this context
pub fn with_schema(mut self, schema: DFSchemaRef) -> Self {
- self.schemas.push(schema);
+ self.schema = Some(schema);
self
}
}
@@ -99,7 +99,7 @@ impl<'a> SimplifyContext<'a> {
impl<'a> SimplifyInfo for SimplifyContext<'a> {
/// returns true if this Expr has boolean type
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
- for schema in &self.schemas {
+ for schema in &self.schema {
if let Ok(DataType::Boolean) = expr.get_type(schema) {
return Ok(true);
}
@@ -110,32 +110,22 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> {
/// Returns true if expr is nullable
fn nullable(&self, expr: &Expr) -> Result<bool> {
- self.schemas
- .iter()
- .find_map(|schema| {
- // expr may be from another input, so ignore errors
- // by converting to None to keep trying
- expr.nullable(schema.as_ref()).ok()
- })
- .ok_or_else(|| {
- // This means we weren't able to compute `Expr::nullable` with
- // *any* input schemas, signalling a problem
- DataFusionError::Internal(format!(
- "Could not find columns in '{expr}' during simplify"
- ))
- })
+ let schema = self.schema.as_ref().ok_or_else(|| {
+ DataFusionError::Internal(
+ "attempt to get nullability without schema".to_string(),
+ )
+ })?;
+ expr.nullable(schema.as_ref())
}
/// Returns data type of this expr needed for determining optimized int type of a value
fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
- if self.schemas.len() == 1 {
- expr.get_type(&self.schemas[0])
- } else {
- Err(DataFusionError::Internal(
- "The expr has more than one schema, could not determine data type"
- .to_string(),
- ))
- }
+ let schema = self.schema.as_ref().ok_or_else(|| {
+ DataFusionError::Internal(
+ "attempt to get data type without schema".to_string(),
+ )
+ })?;
+ expr.get_type(schema)
}
fn execution_props(&self) -> &ExecutionProps {
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 9591563f02..6717fe42bf 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -17,10 +17,12 @@
//! Simplify expressions optimizer rule and implementation
+use std::sync::Arc;
+
use super::{ExprSimplifier, SimplifyContext};
use crate::utils::merge_schema;
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{DFSchemaRef, Result};
+use datafusion_common::{DFSchema, DFSchemaRef, Result};
use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -61,16 +63,16 @@ impl SimplifyExpressions {
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
- // Pass down the `children merge schema` and `plan schema` to evaluate expression types.
- // pass all `child schema` and `plan schema` isn't enough, because like `t1 semi join t2 on
- // on t1.id = t2.id`, each individual schema can't contain all the columns in it.
- let children_merge_schema = DFSchemaRef::new(merge_schema(plan.inputs()));
- let schemas = vec![plan.schema(), &children_merge_schema];
- let info = schemas
- .into_iter()
- .fold(SimplifyContext::new(execution_props), |context, schema| {
- context.with_schema(schema.clone())
- });
+ let schema = if !plan.inputs().is_empty() {
+ DFSchemaRef::new(merge_schema(plan.inputs()))
+ } else if let LogicalPlan::TableScan(_) = plan {
+ // When predicates are pushed into a table scan, there needs to be
+ // a schema to resolve the fields against.
+ Arc::clone(plan.schema())
+ } else {
+ Arc::new(DFSchema::empty())
+ };
+ let info = SimplifyContext::new(execution_props).with_schema(schema);
let simplifier = ExprSimplifier::new(info);
@@ -127,7 +129,8 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::ScalarValue;
- use datafusion_expr::{or, BinaryExpr, Cast, Operator};
+ use datafusion_expr::logical_plan::builder::table_scan_with_filters;
+ use datafusion_expr::{call_fn, or, BinaryExpr, Cast, Operator};
use crate::OptimizerContext;
use datafusion_expr::logical_plan::table_scan;
@@ -808,4 +811,41 @@ mod tests {
assert_optimized_plan_eq(&plan, expected)
}
+
+ #[test]
+ fn simplify_project_scalar_fn() -> Result<()> {
+ // Issue https://github.com/apache/arrow-datafusion/issues/5996
+ let schema = Schema::new(vec![Field::new("f", DataType::Float64, false)]);
+ let plan = table_scan(Some("test"), &schema, None)?
+ .project(vec![call_fn("power", vec![col("f"), lit(1.0)])?])?
+ .build()?;
+
+ // before simplify: power(t.f, 1.0)
+ // after simplify: t.f as "power(t.f, 1.0)"
+ let expected = "Projection: test.f AS power(test.f,Float64(1))\
+ \n TableScan: test";
+
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
+ #[test]
+ fn simplify_scan_predicate() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("f", DataType::Float64, false),
+ Field::new("g", DataType::Float64, false),
+ ]);
+ let plan = table_scan_with_filters(
+ Some("test"),
+ &schema,
+ None,
+ vec![col("g").eq(call_fn("power", vec![col("f"), lit(1.0)])?)],
+ )?
+ .build()?;
+
+ // before simplify: t.g = power(t.f, 1.0)
+ // after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)"
+ let expected =
+ "TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]";
+ assert_optimized_plan_eq(&plan, expected)
+ }
}