You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/22 15:04:47 UTC

[arrow-datafusion] branch main updated: fix: make simplify_expressions use a single schema for resolution (#6077)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0dac1d9f4f fix: make simplify_expressions use a single schema for resolution (#6077)
0dac1d9f4f is described below

commit 0dac1d9f4f942b89ebae22f97aaef1bab825d74f
Author: Christopher M. Wolff <ch...@influxdata.com>
AuthorDate: Sat Apr 22 08:04:41 2023 -0700

    fix: make simplify_expressions use a single schema for resolution (#6077)
    
    * fix: make simplify_expressions use a single schema for resolution
    
    * refactor: Use empty schema when no inputs and not scan
    
    * fix: fmt
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/expr/src/logical_plan/builder.rs        | 14 ++++-
 .../optimizer/src/simplify_expressions/context.rs  | 42 ++++++--------
 .../src/simplify_expressions/simplify_exprs.rs     | 64 ++++++++++++++++++----
 3 files changed, 81 insertions(+), 39 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index d72f6b462c..efc16fb0d6 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1267,13 +1267,25 @@ pub fn table_scan<'a>(
     name: Option<impl Into<TableReference<'a>>>,
     table_schema: &Schema,
     projection: Option<Vec<usize>>,
+) -> Result<LogicalPlanBuilder> {
+    table_scan_with_filters(name, table_schema, projection, vec![])
+}
+
+/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
+/// and inlined filters.
+/// This is mostly used for testing and documentation.
+pub fn table_scan_with_filters<'a>(
+    name: Option<impl Into<TableReference<'a>>>,
+    table_schema: &Schema,
+    projection: Option<Vec<usize>>,
+    filters: Vec<Expr>,
 ) -> Result<LogicalPlanBuilder> {
     let table_source = table_source(table_schema);
     let name = name
         .map(|n| n.into())
         .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
         .to_owned_reference();
-    LogicalPlanBuilder::scan(name, table_source, projection)
+    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
 }
 
 fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
diff --git a/datafusion/optimizer/src/simplify_expressions/context.rs b/datafusion/optimizer/src/simplify_expressions/context.rs
index 0fe1e6ae81..34f3908c7e 100644
--- a/datafusion/optimizer/src/simplify_expressions/context.rs
+++ b/datafusion/optimizer/src/simplify_expressions/context.rs
@@ -76,7 +76,7 @@ pub trait SimplifyInfo {
 /// assert_eq!(simplified, col("b").lt(lit(2)));
 /// ```
 pub struct SimplifyContext<'a> {
-    schemas: Vec<DFSchemaRef>,
+    schema: Option<DFSchemaRef>,
     props: &'a ExecutionProps,
 }
 
@@ -84,14 +84,14 @@ impl<'a> SimplifyContext<'a> {
     /// Create a new SimplifyContext
     pub fn new(props: &'a ExecutionProps) -> Self {
         Self {
-            schemas: vec![],
+            schema: None,
             props,
         }
     }
 
     /// Register a [`DFSchemaRef`] with this context
     pub fn with_schema(mut self, schema: DFSchemaRef) -> Self {
-        self.schemas.push(schema);
+        self.schema = Some(schema);
         self
     }
 }
@@ -99,7 +99,7 @@ impl<'a> SimplifyContext<'a> {
 impl<'a> SimplifyInfo for SimplifyContext<'a> {
     /// returns true if this Expr has boolean type
     fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
-        for schema in &self.schemas {
+        for schema in &self.schema {
             if let Ok(DataType::Boolean) = expr.get_type(schema) {
                 return Ok(true);
             }
@@ -110,32 +110,22 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> {
 
     /// Returns true if expr is nullable
     fn nullable(&self, expr: &Expr) -> Result<bool> {
-        self.schemas
-            .iter()
-            .find_map(|schema| {
-                // expr may be from another input, so ignore errors
-                // by converting to None to keep trying
-                expr.nullable(schema.as_ref()).ok()
-            })
-            .ok_or_else(|| {
-                // This means we weren't able to compute `Expr::nullable` with
-                // *any* input schemas, signalling a problem
-                DataFusionError::Internal(format!(
-                    "Could not find columns in '{expr}' during simplify"
-                ))
-            })
+        let schema = self.schema.as_ref().ok_or_else(|| {
+            DataFusionError::Internal(
+                "attempt to get nullability without schema".to_string(),
+            )
+        })?;
+        expr.nullable(schema.as_ref())
     }
 
     /// Returns data type of this expr needed for determining optimized int type of a value
     fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
-        if self.schemas.len() == 1 {
-            expr.get_type(&self.schemas[0])
-        } else {
-            Err(DataFusionError::Internal(
-                "The expr has more than one schema, could not determine data type"
-                    .to_string(),
-            ))
-        }
+        let schema = self.schema.as_ref().ok_or_else(|| {
+            DataFusionError::Internal(
+                "attempt to get data type without schema".to_string(),
+            )
+        })?;
+        expr.get_type(schema)
     }
 
     fn execution_props(&self) -> &ExecutionProps {
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 9591563f02..6717fe42bf 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -17,10 +17,12 @@
 
 //! Simplify expressions optimizer rule and implementation
 
+use std::sync::Arc;
+
 use super::{ExprSimplifier, SimplifyContext};
 use crate::utils::merge_schema;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{DFSchemaRef, Result};
+use datafusion_common::{DFSchema, DFSchemaRef, Result};
 use datafusion_expr::{logical_plan::LogicalPlan, utils::from_plan};
 use datafusion_physical_expr::execution_props::ExecutionProps;
 
@@ -61,16 +63,16 @@ impl SimplifyExpressions {
         plan: &LogicalPlan,
         execution_props: &ExecutionProps,
     ) -> Result<LogicalPlan> {
-        // Pass down the `children merge schema` and `plan schema` to evaluate expression types.
-        // pass all `child schema` and `plan schema` isn't enough, because like `t1 semi join t2 on
-        // on t1.id = t2.id`, each individual schema can't contain all the columns in it.
-        let children_merge_schema = DFSchemaRef::new(merge_schema(plan.inputs()));
-        let schemas = vec![plan.schema(), &children_merge_schema];
-        let info = schemas
-            .into_iter()
-            .fold(SimplifyContext::new(execution_props), |context, schema| {
-                context.with_schema(schema.clone())
-            });
+        let schema = if !plan.inputs().is_empty() {
+            DFSchemaRef::new(merge_schema(plan.inputs()))
+        } else if let LogicalPlan::TableScan(_) = plan {
+            // When predicates are pushed into a table scan, there needs to be
+            // a schema to resolve the fields against.
+            Arc::clone(plan.schema())
+        } else {
+            Arc::new(DFSchema::empty())
+        };
+        let info = SimplifyContext::new(execution_props).with_schema(schema);
 
         let simplifier = ExprSimplifier::new(info);
 
@@ -127,7 +129,8 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use chrono::{DateTime, TimeZone, Utc};
     use datafusion_common::ScalarValue;
-    use datafusion_expr::{or, BinaryExpr, Cast, Operator};
+    use datafusion_expr::logical_plan::builder::table_scan_with_filters;
+    use datafusion_expr::{call_fn, or, BinaryExpr, Cast, Operator};
 
     use crate::OptimizerContext;
     use datafusion_expr::logical_plan::table_scan;
@@ -808,4 +811,41 @@ mod tests {
 
         assert_optimized_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn simplify_project_scalar_fn() -> Result<()> {
+        // Issue https://github.com/apache/arrow-datafusion/issues/5996
+        let schema = Schema::new(vec![Field::new("f", DataType::Float64, false)]);
+        let plan = table_scan(Some("test"), &schema, None)?
+            .project(vec![call_fn("power", vec![col("f"), lit(1.0)])?])?
+            .build()?;
+
+        // before simplify: power(t.f, 1.0)
+        // after simplify:  t.f as "power(t.f, 1.0)"
+        let expected = "Projection: test.f AS power(test.f,Float64(1))\
+                      \n  TableScan: test";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn simplify_scan_predicate() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new("f", DataType::Float64, false),
+            Field::new("g", DataType::Float64, false),
+        ]);
+        let plan = table_scan_with_filters(
+            Some("test"),
+            &schema,
+            None,
+            vec![col("g").eq(call_fn("power", vec![col("f"), lit(1.0)])?)],
+        )?
+        .build()?;
+
+        // before simplify: t.g = power(t.f, 1.0)
+        // after simplify:  (t.g = t.f) as "t.g = power(t.f, 1.0)"
+        let expected =
+            "TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]";
+        assert_optimized_plan_eq(&plan, expected)
+    }
 }