You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/15 21:53:54 UTC

[arrow-datafusion] branch master updated: fix alias rewrite In_List for filter push down (#2729)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 06fa71ad3 fix alias rewrite In_List for filter push down (#2729)
06fa71ad3 is described below

commit 06fa71ad32dfaf9b51492dfe7c9350d1cd7163f8
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu Jun 16 05:53:48 2022 +0800

    fix alias rewrite In_List for filter push down (#2729)
---
 datafusion/optimizer/src/filter_push_down.rs | 216 ++++++++++++++++++++++++++-
 datafusion/optimizer/src/utils.rs            |   6 +-
 2 files changed, 220 insertions(+), 2 deletions(-)

diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 7b3b6a21f..561b6b36b 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -654,7 +654,7 @@ mod tests {
     use async_trait::async_trait;
     use datafusion_common::DFSchema;
     use datafusion_expr::{
-        and, col, lit,
+        and, col, in_list, lit,
         logical_plan::{builder::union_with_alias, JoinType},
         sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType,
     };
@@ -1831,4 +1831,218 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_filter_with_alias() -> Result<()> {
+        // in table scan the true col name is 'test.a',
+        // but we rename it as 'b', and use col 'b' in filter
+        // we need rewrite filter col before push down.
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
+            .build()?;
+
+        // filter on col b
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Filter: #b > Int64(10) AND #test.c > Int64(10)\
+            \n  Projection: #test.a AS b, #test.c\
+            \n    TableScan: test projection=None\
+            "
+        );
+
+        // rewrite filter col b to test.a
+        let expected = "\
+            Projection: #test.a AS b, #test.c\
+            \n  Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+            \n    TableScan: test projection=None\
+            ";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_filter_with_alias_2() -> Result<()> {
+        // in table scan the true col name is 'test.a',
+        // but we rename it as 'b', and use col 'b' in filter
+        // we need rewrite filter col before push down.
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .project(vec![col("b"), col("c")])?
+            .filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
+            .build()?;
+
+        // filter on col b
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Filter: #b > Int64(10) AND #test.c > Int64(10)\
+            \n  Projection: #b, #test.c\
+            \n    Projection: #test.a AS b, #test.c\
+            \n      TableScan: test projection=None\
+            "
+        );
+
+        // rewrite filter col b to test.a
+        let expected = "\
+            Projection: #b, #test.c\
+            \n  Projection: #test.a AS b, #test.c\
+            \n    Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+            \n      TableScan: test projection=None\
+            ";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_filter_with_multi_alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("b"), col("c").alias("d")])?
+            .filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))?
+            .build()?;
+
+        // filter on col b and d
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Filter: #b > Int64(10) AND #d > Int64(10)\
+            \n  Projection: #test.a AS b, #test.c AS d\
+            \n    TableScan: test projection=None\
+            "
+        );
+
+        // rewrite filter col b to test.a, col d to test.c
+        let expected = "\
+            Projection: #test.a AS b, #test.c AS d\
+            \n  Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+            \n    TableScan: test projection=None\
+            ";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// predicate on join key in filter expression should be pushed down to both inputs
+    #[test]
+    fn join_filter_with_alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("c")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("b").alias("d")])?
+            .build()?;
+        let filter = col("c").gt(lit(1u32));
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                &right,
+                JoinType::Inner,
+                (vec![Column::from_name("c")], vec![Column::from_name("d")]),
+                Some(filter),
+            )?
+            .build()?;
+
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Inner Join: #c = #d Filter: #c > UInt32(1)\
+            \n  Projection: #test.a AS c\
+            \n    TableScan: test projection=None\
+            \n  Projection: #test2.b AS d\
+            \n    TableScan: test2 projection=None"
+        );
+
+        // Change filter on col `c`, 'd' to `test.a`, 'test.b'
+        let expected = "\
+        Inner Join: #c = #d\
+        \n  Projection: #test.a AS c\
+        \n    Filter: #test.a > UInt32(1)\
+        \n      TableScan: test projection=None\
+        \n  Projection: #test2.b AS d\
+        \n    Filter: #test2.b > UInt32(1)\
+        \n      TableScan: test2 projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_filter_with_alias() -> Result<()> {
+        // in table scan the true col name is 'test.a',
+        // but we rename it as 'b', and use col 'b' in filter
+        // we need rewrite filter col before push down.
+        let table_scan = test_table_scan()?;
+        let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(in_list(col("b"), filter_value, false))?
+            .build()?;
+
+        // filter on col b
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+            \n  Projection: #test.a AS b, #test.c\
+            \n    TableScan: test projection=None\
+            "
+        );
+
+        // rewrite filter col b to test.a
+        let expected = "\
+            Projection: #test.a AS b, #test.c\
+            \n  Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+            \n    TableScan: test projection=None\
+            ";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_filter_with_alias_2() -> Result<()> {
+        // in table scan the true col name is 'test.a',
+        // but we rename it as 'b', and use col 'b' in filter
+        // we need rewrite filter col before push down.
+        let table_scan = test_table_scan()?;
+        let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .project(vec![col("b"), col("c")])?
+            .filter(in_list(col("b"), filter_value, false))?
+            .build()?;
+
+        // filter on col b
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+            \n  Projection: #b, #test.c\
+            \n    Projection: #test.a AS b, #test.c\
+            \n      TableScan: test projection=None\
+            "
+        );
+
+        // rewrite filter col b to test.a
+        let expected = "\
+            Projection: #b, #test.c\
+            \n  Projection: #test.a AS b, #test.c\
+            \n    Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+            \n      TableScan: test projection=None\
+            ";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index 35414f5f8..691d8b663 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -276,9 +276,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
         }
         Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
         Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
+        Expr::InList { list, negated, .. } => Ok(Expr::InList {
+            expr: Box::new(expressions[0].clone()),
+            list: list.clone(),
+            negated: *negated,
+        }),
         Expr::Column(_)
         | Expr::Literal(_)
-        | Expr::InList { .. }
         | Expr::Exists { .. }
         | Expr::InSubquery { .. }
         | Expr::ScalarSubquery(_)