You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/08 21:16:26 UTC

[arrow-datafusion] branch master updated: rewrite predicates before pushing to union inputs (#1781)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c198b  rewrite predicates before pushing to union inputs (#1781)
e8c198b is described below

commit e8c198b9fac6cd8822b950b9f71898e47965488d
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Wed Feb 9 00:16:08 2022 +0300

    rewrite predicates before pushing to union inputs (#1781)
---
 datafusion/src/optimizer/filter_push_down.rs | 54 +++++++++++++++++++++++++---
 1 file changed, 49 insertions(+), 5 deletions(-)

diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index ababb52..7891131 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,9 +16,9 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
+use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
 use crate::logical_plan::{
-    and, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
+    and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
 };
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
@@ -394,8 +394,29 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
             // sort is filter-commutable
             push_down(&state, plan)
         }
-        LogicalPlan::Union(_) => {
-            // union all is filter-commutable
+        LogicalPlan::Union(Union {
+            inputs: _,
+            schema,
+            alias: _,
+        }) => {
+            // union changing all qualifiers while building logical plan so we need
+            // to rewrite filters to push unqualified columns to inputs
+            let projection = schema
+                .fields()
+                .iter()
+                .map(|field| (field.qualified_name(), col(field.name())))
+                .collect::<HashMap<_, _>>();
+
+            // rewriting predicate expressions using unqualified names as replacements
+            if !projection.is_empty() {
+                for (predicate, columns) in state.filters.iter_mut() {
+                    *predicate = rewrite(predicate, &projection)?;
+
+                    columns.clear();
+                    utils::expr_to_columns(predicate, columns)?;
+                }
+            }
+
             push_down(&state, plan)
         }
         LogicalPlan::Limit(Limit { input, .. }) => {
@@ -574,7 +595,9 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
 mod tests {
     use super::*;
     use crate::datasource::TableProvider;
-    use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
+    use crate::logical_plan::{
+        lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
+    };
     use crate::physical_plan::ExecutionPlan;
     use crate::test::*;
     use crate::{logical_plan::col, prelude::JoinType};
@@ -901,6 +924,27 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn union_all_with_alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let union =
+            union_with_alias(table_scan.clone(), table_scan, Some("t".to_string()))?;
+
+        let plan = LogicalPlanBuilder::from(union)
+            .filter(col("t.a").eq(lit(1i64)))?
+            .build()?;
+
+        // filter appears below Union without relation qualifier
+        let expected = "\
+            Union\
+            \n  Filter: #a = Int64(1)\
+            \n    TableScan: test projection=None\
+            \n  Filter: #a = Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
     /// verifies that filters with the same columns are correctly placed
     #[test]
     fn filter_2_breaks_limits() -> Result<()> {