You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/07 13:43:52 UTC

[arrow-datafusion] branch master updated: Remove qualifiers on pushed down predicates / Fix parquet pruning (#689)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 79d60f9  Remove qualifiers on pushed down predicates / Fix parquet pruning (#689)
79d60f9 is described below

commit 79d60f9b678e9a2351fc83511399663985e39cf6
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jul 7 09:43:44 2021 -0400

    Remove qualifiers on pushed down predicates / Fix parquet pruning (#689)
    
    * Remove qualifiers on pushed down predicates
    
    * Add test for normalizing and unnormalizing columns
    
    * Fix logical conflict
---
 datafusion/src/logical_plan/expr.rs     | 149 ++++++++++++++++++++++++++++++--
 datafusion/src/logical_plan/mod.rs      |   4 +-
 datafusion/src/physical_plan/planner.rs |  13 ++-
 datafusion/tests/parquet_pruning.rs     |  24 ++---
 4 files changed, 164 insertions(+), 26 deletions(-)

diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 9454d75..59c9979 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -90,14 +90,22 @@ impl Column {
     /// For example, `foo` will be normalized to `t.foo` if there is a
     /// column named `foo` in a relation named `t` found in `schemas`
     pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+        self.normalize_with_schemas(&schemas, &using_columns)
+    }
+
+    // Internal implementation of normalize
+    fn normalize_with_schemas(
+        self,
+        schemas: &[&Arc<DFSchema>],
+        using_columns: &[HashSet<Column>],
+    ) -> Result<Self> {
         if self.relation.is_some() {
             return Ok(self);
         }
 
-        let schemas = plan.all_schemas();
-        let using_columns = plan.using_columns()?;
-
-        for schema in &schemas {
+        for schema in schemas {
             let fields = schema.fields_with_unqualified_name(&self.name);
             match fields.len() {
                 0 => continue,
@@ -118,7 +126,7 @@ impl Column {
                     // We will use the relation from the first matched field to normalize self.
 
                     // Compare matched fields with one USING JOIN clause at a time
-                    for using_col in &using_columns {
+                    for using_col in using_columns {
                         let all_matched = fields
                             .iter()
                             .all(|f| using_col.contains(&f.qualified_column()));
@@ -1171,22 +1179,39 @@ pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<E
 
 /// Recursively call [`Column::normalize`] on all Column expressions
 /// in the `expr` expression tree.
-pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {
+pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
+    normalize_col_with_schemas(expr, &plan.all_schemas(), &plan.using_columns()?)
+}
+
+/// Recursively call [`Column::normalize`] on all Column expressions
+/// in the `expr` expression tree.
+fn normalize_col_with_schemas(
+    expr: Expr,
+    schemas: &[&Arc<DFSchema>],
+    using_columns: &[HashSet<Column>],
+) -> Result<Expr> {
     struct ColumnNormalizer<'a> {
-        plan: &'a LogicalPlan,
+        schemas: &'a [&'a Arc<DFSchema>],
+        using_columns: &'a [HashSet<Column>],
     }
 
     impl<'a> ExprRewriter for ColumnNormalizer<'a> {
         fn mutate(&mut self, expr: Expr) -> Result<Expr> {
             if let Expr::Column(c) = expr {
-                Ok(Expr::Column(c.normalize(self.plan)?))
+                Ok(Expr::Column(c.normalize_with_schemas(
+                    self.schemas,
+                    self.using_columns,
+                )?))
             } else {
                 Ok(expr)
             }
         }
     }
 
-    e.rewrite(&mut ColumnNormalizer { plan })
+    expr.rewrite(&mut ColumnNormalizer {
+        schemas,
+        using_columns,
+    })
 }
 
 /// Recursively normalize all Column expressions in a list of expression trees
@@ -1198,6 +1223,38 @@ pub fn normalize_cols(
     exprs.into_iter().map(|e| normalize_col(e, plan)).collect()
 }
 
+/// Recursively 'unnormalize' (remove all qualifiers) from an
+/// expression tree.
+///
+/// For example, if there were expressions like `foo.bar` this would
+/// rewrite it to just `bar`.
+pub fn unnormalize_col(expr: Expr) -> Expr {
+    struct RemoveQualifier {}
+
+    impl ExprRewriter for RemoveQualifier {
+        fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+            if let Expr::Column(col) = expr {
+                //let Column { relation: _, name } = col;
+                Ok(Expr::Column(Column {
+                    relation: None,
+                    name: col.name,
+                }))
+            } else {
+                Ok(expr)
+            }
+        }
+    }
+
+    expr.rewrite(&mut RemoveQualifier {})
+        .expect("Unnormalize is infallable")
+}
+
+/// Recursively un-normalize all Column expressions in a list of expression trees
+#[inline]
+pub fn unnormalize_cols(exprs: impl IntoIterator<Item = Expr>) -> Vec<Expr> {
+    exprs.into_iter().map(unnormalize_col).collect()
+}
+
 /// Create an expression to represent the min() aggregate function
 pub fn min(expr: Expr) -> Expr {
     Expr::AggregateFunction {
@@ -1810,4 +1867,78 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn normalize_cols() {
+        let expr = col("a") + col("b") + col("c");
+
+        // Schemas with some matching and some non matching cols
+        let schema_a =
+            DFSchema::new(vec![make_field("tableA", "a"), make_field("tableA", "aa")])
+                .unwrap();
+        let schema_c =
+            DFSchema::new(vec![make_field("tableC", "cc"), make_field("tableC", "c")])
+                .unwrap();
+        let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
+        // non matching
+        let schema_f =
+            DFSchema::new(vec![make_field("tableC", "f"), make_field("tableC", "ff")])
+                .unwrap();
+        let schemas = vec![schema_c, schema_f, schema_b, schema_a]
+            .into_iter()
+            .map(Arc::new)
+            .collect::<Vec<_>>();
+        let schemas = schemas.iter().collect::<Vec<_>>();
+
+        let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
+        assert_eq!(
+            normalized_expr,
+            col("tableA.a") + col("tableB.b") + col("tableC.c")
+        );
+    }
+
+    #[test]
+    fn normalize_cols_priority() {
+        let expr = col("a") + col("b");
+        // Schemas with multiple matches for column a, first takes priority
+        let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
+        let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
+        let schema_a2 = DFSchema::new(vec![make_field("tableA2", "a")]).unwrap();
+        let schemas = vec![schema_a2, schema_b, schema_a]
+            .into_iter()
+            .map(Arc::new)
+            .collect::<Vec<_>>();
+        let schemas = schemas.iter().collect::<Vec<_>>();
+
+        let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
+        assert_eq!(normalized_expr, col("tableA2.a") + col("tableB.b"));
+    }
+
+    #[test]
+    fn normalize_cols_non_exist() {
+        // test normalizing columns when the name doesn't exist
+        let expr = col("a") + col("b");
+        let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
+        let schemas = vec![schema_a].into_iter().map(Arc::new).collect::<Vec<_>>();
+        let schemas = schemas.iter().collect::<Vec<_>>();
+
+        let error = normalize_col_with_schemas(expr, &schemas, &[])
+            .unwrap_err()
+            .to_string();
+        assert_eq!(
+            error,
+            "Error during planning: Column #b not found in provided schemas"
+        );
+    }
+
+    #[test]
+    fn unnormalize_cols() {
+        let expr = col("tableA.a") + col("tableB.b");
+        let unnormalized_expr = unnormalize_col(expr);
+        assert_eq!(unnormalized_expr, col("a") + col("b"));
+    }
+
+    fn make_field(relation: &str, column: &str) -> DFField {
+        DFField::new(Some(relation), column, DataType::Int8, false)
+    }
 }
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 86a2f56..2c751ab 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -43,8 +43,8 @@ pub use expr::{
     min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
     regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
     sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
-    substr, sum, tan, to_hex, translate, trim, trunc, upper, when, Column, Expr,
-    ExprRewriter, ExpressionVisitor, Literal, Recursion,
+    substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
+    upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
 };
 pub use extension::UserDefinedLogicalNode;
 pub use operators::Operator;
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 73b2f36..df41683 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,8 +23,9 @@ use super::{
 };
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::{
-    DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
-    StringifiedPlan, UserDefinedLogicalNode,
+    unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
+    Partitioning as LogicalPartitioning, PlanType, StringifiedPlan,
+    UserDefinedLogicalNode,
 };
 use crate::physical_plan::explain::ExplainExec;
 use crate::physical_plan::expressions;
@@ -311,7 +312,13 @@ impl DefaultPhysicalPlanner {
                 filters,
                 limit,
                 ..
-            } => source.scan(projection, batch_size, filters, *limit),
+            } => {
+                // Remove all qualifiers from the scan as the provider
+                // doesn't know (nor should care) how the relation was
+                // referred to in the query
+                let filters = unnormalize_cols(filters.iter().cloned());
+                source.scan(projection, batch_size, &filters, *limit)
+            }
             LogicalPlan::Window {
                 input, window_expr, ..
             } => {
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index 86b3946..f5486af 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -44,9 +44,9 @@ async fn prune_timestamps_nanos() {
         .query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')")
         .await;
     println!("{}", output.description());
-    // TODO This should prune one metrics without error
-    assert_eq!(output.predicate_evaluation_errors(), Some(1));
-    assert_eq!(output.row_groups_pruned(), Some(0));
+    // This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(1));
     assert_eq!(output.result_rows, 10, "{}", output.description());
 }
 
@@ -59,9 +59,9 @@ async fn prune_timestamps_micros() {
         )
         .await;
     println!("{}", output.description());
-    // TODO This should prune one metrics without error
-    assert_eq!(output.predicate_evaluation_errors(), Some(1));
-    assert_eq!(output.row_groups_pruned(), Some(0));
+    // This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(1));
     assert_eq!(output.result_rows, 10, "{}", output.description());
 }
 
@@ -74,9 +74,9 @@ async fn prune_timestamps_millis() {
         )
         .await;
     println!("{}", output.description());
-    // TODO This should prune one metrics without error
-    assert_eq!(output.predicate_evaluation_errors(), Some(1));
-    assert_eq!(output.row_groups_pruned(), Some(0));
+    // This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(1));
     assert_eq!(output.result_rows, 10, "{}", output.description());
 }
 
@@ -89,9 +89,9 @@ async fn prune_timestamps_seconds() {
         )
         .await;
     println!("{}", output.description());
-    // TODO This should prune one metrics without error
-    assert_eq!(output.predicate_evaluation_errors(), Some(1));
-    assert_eq!(output.row_groups_pruned(), Some(0));
+    // This should prune one metrics without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_groups_pruned(), Some(1));
     assert_eq!(output.result_rows, 10, "{}", output.description());
 }