You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/15 21:53:54 UTC
[arrow-datafusion] branch master updated: fix alias rewrite In_List for filter push down (#2729)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 06fa71ad3 fix alias rewrite In_List for filter push down (#2729)
06fa71ad3 is described below
commit 06fa71ad32dfaf9b51492dfe7c9350d1cd7163f8
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu Jun 16 05:53:48 2022 +0800
fix alias rewrite In_List for filter push down (#2729)
---
datafusion/optimizer/src/filter_push_down.rs | 216 ++++++++++++++++++++++++++-
datafusion/optimizer/src/utils.rs | 6 +-
2 files changed, 220 insertions(+), 2 deletions(-)
diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 7b3b6a21f..561b6b36b 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -654,7 +654,7 @@ mod tests {
use async_trait::async_trait;
use datafusion_common::DFSchema;
use datafusion_expr::{
- and, col, lit,
+ and, col, in_list, lit,
logical_plan::{builder::union_with_alias, JoinType},
sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType,
};
@@ -1831,4 +1831,218 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_filter_with_alias() -> Result<()> {
+ // in table scan the true col name is 'test.a',
+ // but we rename it as 'b', and use col 'b' in filter
+ // we need rewrite filter col before push down.
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("b"), col("c")])?
+ .filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
+ .build()?;
+
+ // filter on col b
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Filter: #b > Int64(10) AND #test.c > Int64(10)\
+ \n Projection: #test.a AS b, #test.c\
+ \n TableScan: test projection=None\
+ "
+ );
+
+ // rewrite filter col b to test.a
+ let expected = "\
+ Projection: #test.a AS b, #test.c\
+ \n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+ \n TableScan: test projection=None\
+ ";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_filter_with_alias_2() -> Result<()> {
+ // in table scan the true col name is 'test.a',
+ // but we rename it as 'b', and use col 'b' in filter
+ // we need rewrite filter col before push down.
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("b"), col("c")])?
+ .project(vec![col("b"), col("c")])?
+ .filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
+ .build()?;
+
+ // filter on col b
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Filter: #b > Int64(10) AND #test.c > Int64(10)\
+ \n Projection: #b, #test.c\
+ \n Projection: #test.a AS b, #test.c\
+ \n TableScan: test projection=None\
+ "
+ );
+
+ // rewrite filter col b to test.a
+ let expected = "\
+ Projection: #b, #test.c\
+ \n Projection: #test.a AS b, #test.c\
+ \n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+ \n TableScan: test projection=None\
+ ";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_filter_with_multi_alias() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("b"), col("c").alias("d")])?
+ .filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))?
+ .build()?;
+
+ // filter on col b and d
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Filter: #b > Int64(10) AND #d > Int64(10)\
+ \n Projection: #test.a AS b, #test.c AS d\
+ \n TableScan: test projection=None\
+ "
+ );
+
+ // rewrite filter col b to test.a, col d to test.c
+ let expected = "\
+ Projection: #test.a AS b, #test.c AS d\
+ \n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
+ \n TableScan: test projection=None\
+ ";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ /// predicate on join key in filter expression should be pushed down to both inputs
+ #[test]
+ fn join_filter_with_alias() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let left = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("c")])?
+ .build()?;
+ let right_table_scan = test_table_scan_with_name("test2")?;
+ let right = LogicalPlanBuilder::from(right_table_scan)
+ .project(vec![col("b").alias("d")])?
+ .build()?;
+ let filter = col("c").gt(lit(1u32));
+ let plan = LogicalPlanBuilder::from(left)
+ .join(
+ &right,
+ JoinType::Inner,
+ (vec![Column::from_name("c")], vec![Column::from_name("d")]),
+ Some(filter),
+ )?
+ .build()?;
+
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Inner Join: #c = #d Filter: #c > UInt32(1)\
+ \n Projection: #test.a AS c\
+ \n TableScan: test projection=None\
+ \n Projection: #test2.b AS d\
+ \n TableScan: test2 projection=None"
+ );
+
+ // Change filter on col `c`, 'd' to `test.a`, 'test.b'
+ let expected = "\
+ Inner Join: #c = #d\
+ \n Projection: #test.a AS c\
+ \n Filter: #test.a > UInt32(1)\
+ \n TableScan: test projection=None\
+ \n Projection: #test2.b AS d\
+ \n Filter: #test2.b > UInt32(1)\
+ \n TableScan: test2 projection=None";
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_filter_with_alias() -> Result<()> {
+ // in table scan the true col name is 'test.a',
+ // but we rename it as 'b', and use col 'b' in filter
+ // we need rewrite filter col before push down.
+ let table_scan = test_table_scan()?;
+ let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("b"), col("c")])?
+ .filter(in_list(col("b"), filter_value, false))?
+ .build()?;
+
+ // filter on col b
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+ \n Projection: #test.a AS b, #test.c\
+ \n TableScan: test projection=None\
+ "
+ );
+
+ // rewrite filter col b to test.a
+ let expected = "\
+ Projection: #test.a AS b, #test.c\
+ \n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+ \n TableScan: test projection=None\
+ ";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_filter_with_alias_2() -> Result<()> {
+ // in table scan the true col name is 'test.a',
+ // but we rename it as 'b', and use col 'b' in filter
+ // we need rewrite filter col before push down.
+ let table_scan = test_table_scan()?;
+ let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a").alias("b"), col("c")])?
+ .project(vec![col("b"), col("c")])?
+ .filter(in_list(col("b"), filter_value, false))?
+ .build()?;
+
+ // filter on col b
+ assert_eq!(
+ format!("{:?}", plan),
+ "\
+ Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+ \n Projection: #b, #test.c\
+ \n Projection: #test.a AS b, #test.c\
+ \n TableScan: test projection=None\
+ "
+ );
+
+ // rewrite filter col b to test.a
+ let expected = "\
+ Projection: #b, #test.c\
+ \n Projection: #test.a AS b, #test.c\
+ \n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
+ \n TableScan: test projection=None\
+ ";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
}
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index 35414f5f8..691d8b663 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -276,9 +276,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}
Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
+ Expr::InList { list, negated, .. } => Ok(Expr::InList {
+ expr: Box::new(expressions[0].clone()),
+ list: list.clone(),
+ negated: *negated,
+ }),
Expr::Column(_)
| Expr::Literal(_)
- | Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)