You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/08 21:16:26 UTC
[arrow-datafusion] branch master updated: rewrite predicates before pushing to union inputs (#1781)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e8c198b rewrite predicates before pushing to union inputs (#1781)
e8c198b is described below
commit e8c198b9fac6cd8822b950b9f71898e47965488d
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Wed Feb 9 00:16:08 2022 +0300
rewrite predicates before pushing to union inputs (#1781)
---
datafusion/src/optimizer/filter_push_down.rs | 54 +++++++++++++++++++++++++---
1 file changed, 49 insertions(+), 5 deletions(-)
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index ababb52..7891131 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,9 +16,9 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
+use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
- and, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
+ and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
@@ -394,8 +394,29 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// sort is filter-commutable
push_down(&state, plan)
}
- LogicalPlan::Union(_) => {
- // union all is filter-commutable
+ LogicalPlan::Union(Union {
+ inputs: _,
+ schema,
+ alias: _,
+ }) => {
+ // union changing all qualifiers while building logical plan so we need
+ // to rewrite filters to push unqualified columns to inputs
+ let projection = schema
+ .fields()
+ .iter()
+ .map(|field| (field.qualified_name(), col(field.name())))
+ .collect::<HashMap<_, _>>();
+
+ // rewriting predicate expressions using unqualified names as replacements
+ if !projection.is_empty() {
+ for (predicate, columns) in state.filters.iter_mut() {
+ *predicate = rewrite(predicate, &projection)?;
+
+ columns.clear();
+ utils::expr_to_columns(predicate, columns)?;
+ }
+ }
+
push_down(&state, plan)
}
LogicalPlan::Limit(Limit { input, .. }) => {
@@ -574,7 +595,9 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
mod tests {
use super::*;
use crate::datasource::TableProvider;
- use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
+ use crate::logical_plan::{
+ lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
+ };
use crate::physical_plan::ExecutionPlan;
use crate::test::*;
use crate::{logical_plan::col, prelude::JoinType};
@@ -901,6 +924,27 @@ mod tests {
Ok(())
}
+ #[test]
+ fn union_all_with_alias() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let union =
+ union_with_alias(table_scan.clone(), table_scan, Some("t".to_string()))?;
+
+ let plan = LogicalPlanBuilder::from(union)
+ .filter(col("t.a").eq(lit(1i64)))?
+ .build()?;
+
+ // filter appears below Union without relation qualifier
+ let expected = "\
+ Union\
+ \n Filter: #a = Int64(1)\
+ \n TableScan: test projection=None\
+ \n Filter: #a = Int64(1)\
+ \n TableScan: test projection=None";
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
/// verifies that filters with the same columns are correctly placed
#[test]
fn filter_2_breaks_limits() -> Result<()> {