You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ho...@apache.org on 2021/08/02 03:46:47 UTC
[arrow-datafusion] branch master updated: Convert unsupported
conditions in left right join to filters (#796)
This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 979dc31 Convert unsupported conditions in left right join to filters (#796)
979dc31 is described below
commit 979dc3139c3e2aa1f2995c018375aea36cdf6274
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Mon Aug 2 05:46:43 2021 +0200
Convert unsupported conditions in left right join to filters (#796)
* Unsupported conditions in join
* Simplify, support right join, add tests
* Add tests
* Add comments
* Clippy
---
benchmarks/src/bin/tpch.rs | 5 ++
datafusion/src/sql/planner.rs | 128 +++++++++++++++++++++++++++++++++++++-----
datafusion/tests/sql.rs | 46 ++++++++++++++-
3 files changed, 164 insertions(+), 15 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 08b8864..42755ec 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -745,6 +745,11 @@ mod tests {
}
#[tokio::test]
+ async fn run_q13() -> Result<()> {
+ run_query(13).await
+ }
+
+ #[tokio::test]
async fn run_q14() -> Result<()> {
run_query(14).await
}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 6d9484b..481f12b 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -31,6 +31,7 @@ use crate::logical_plan::{
DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
ToStringifiedPlan,
};
+use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::{
@@ -325,16 +326,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let right = self.create_relation(&join.relation, ctes)?;
match &join.join_operator {
JoinOperator::LeftOuter(constraint) => {
- self.parse_join(left, &right, constraint, JoinType::Left)
+ self.parse_join(left, right, constraint, JoinType::Left)
}
JoinOperator::RightOuter(constraint) => {
- self.parse_join(left, &right, constraint, JoinType::Right)
+ self.parse_join(left, right, constraint, JoinType::Right)
}
JoinOperator::Inner(constraint) => {
- self.parse_join(left, &right, constraint, JoinType::Inner)
+ self.parse_join(left, right, constraint, JoinType::Inner)
}
JoinOperator::FullOuter(constraint) => {
- self.parse_join(left, &right, constraint, JoinType::Full)
+ self.parse_join(left, right, constraint, JoinType::Full)
}
JoinOperator::CrossJoin => self.parse_cross_join(left, &right),
other => Err(DataFusionError::NotImplemented(format!(
@@ -354,7 +355,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn parse_join(
&self,
left: LogicalPlan,
- right: &LogicalPlan,
+ right: LogicalPlan,
constraint: &JoinConstraint,
join_type: JoinType,
) -> Result<LogicalPlan> {
@@ -372,18 +373,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// extract join keys
extract_join_keys(&expr, &mut keys, &mut filter);
+ let mut cols = HashSet::new();
+ exprlist_to_columns(&filter, &mut cols)?;
+
let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
keys.into_iter().unzip();
- // return the logical plan representing the join
- let join = LogicalPlanBuilder::from(left).join(
- right,
- join_type,
- (left_keys, right_keys),
- )?;
+ // return the logical plan representing the join
if filter.is_empty() {
+ let join = LogicalPlanBuilder::from(left).join(
+ &right,
+ join_type,
+ (left_keys, right_keys),
+ )?;
join.build()
} else if join_type == JoinType::Inner {
+ let join = LogicalPlanBuilder::from(left).join(
+ &right,
+ join_type,
+ (left_keys, right_keys),
+ )?;
join.filter(
filter
.iter()
@@ -391,6 +400,64 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
)?
.build()
+ }
+ // Left join with all non-equijoin expressions from the right
+ // l left join r
+ // on l1=r1 and r2 > [..]
+ else if join_type == JoinType::Left
+ && cols.iter().all(
+ |Column {
+ relation: qualifier,
+ name,
+ }| {
+ right
+ .schema()
+ .field_with_name(qualifier.as_deref(), name)
+ .is_ok()
+ },
+ )
+ {
+ LogicalPlanBuilder::from(left)
+ .join(
+ &LogicalPlanBuilder::from(right)
+ .filter(
+ filter
+ .iter()
+ .skip(1)
+ .fold(filter[0].clone(), |acc, e| {
+ acc.and(e.clone())
+ }),
+ )?
+ .build()?,
+ join_type,
+ (left_keys, right_keys),
+ )?
+ .build()
+ }
+ // Right join with all non-equijoin expressions from the left
+ // l right join r
+ // on l1=r1 and l2 > [..]
+ else if join_type == JoinType::Right
+ && cols.iter().all(
+ |Column {
+ relation: qualifier,
+ name,
+ }| {
+ left.schema()
+ .field_with_name(qualifier.as_deref(), name)
+ .is_ok()
+ },
+ )
+ {
+ LogicalPlanBuilder::from(left)
+ .filter(
+ filter
+ .iter()
+ .skip(1)
+ .fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
+ )?
+ .join(&right, join_type, (left_keys, right_keys))?
+ .build()
} else {
Err(DataFusionError::NotImplemented(format!(
"Unsupported expressions in {:?} JOIN: {:?}",
@@ -404,7 +471,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map(|x| Column::from_name(x.value.clone()))
.collect();
LogicalPlanBuilder::from(left)
- .join_using(right, join_type, keys)?
+ .join_using(&right, join_type, keys)?
.build()
}
JoinConstraint::Natural => {
@@ -1650,9 +1717,16 @@ fn extract_join_keys(
extract_join_keys(left, accum, accum_filter);
extract_join_keys(right, accum, accum_filter);
}
- _other => {
+ _other
+ if matches!(**left, Expr::Column(_))
+ || matches!(**right, Expr::Column(_)) =>
+ {
accum_filter.push(expr.clone());
}
+ _other => {
+ extract_join_keys(left, accum, accum_filter);
+ extract_join_keys(right, accum, accum_filter);
+ }
},
_other => {
accum_filter.push(expr.clone());
@@ -2812,6 +2886,34 @@ mod tests {
}
#[test]
+ fn left_equijoin_unsupported_expression() {
+ let sql = "SELECT id, order_id \
+ FROM person \
+ LEFT JOIN orders \
+ ON id = customer_id AND order_id > 1";
+ let expected = "Projection: #person.id, #orders.order_id\
+ \n Join: #person.id = #orders.customer_id\
+ \n TableScan: person projection=None\
+ \n Filter: #orders.order_id Gt Int64(1)\
+ \n TableScan: orders projection=None";
+ quick_test(sql, expected);
+ }
+
+ #[test]
+ fn right_equijoin_unsupported_expression() {
+ let sql = "SELECT id, order_id \
+ FROM person \
+ RIGHT JOIN orders \
+ ON id = customer_id AND id > 1";
+ let expected = "Projection: #person.id, #orders.order_id\
+ \n Join: #person.id = #orders.customer_id\
+ \n Filter: #person.id Gt Int64(1)\
+ \n TableScan: person projection=None\
+ \n TableScan: orders projection=None";
+ quick_test(sql, expected);
+ }
+
+ #[test]
fn join_with_table_name() {
let sql = "SELECT id, order_id \
FROM person \
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index bfe2f2f..42a7d20 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1766,13 +1766,55 @@ async fn equijoin_and_other_condition() -> Result<()> {
}
#[tokio::test]
+async fn equijoin_left_and_condition_from_right() -> Result<()> {
+ let mut ctx = create_join_context("t1_id", "t2_id")?;
+ let sql =
+ "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+ let res = ctx.create_logical_plan(sql);
+ assert!(res.is_ok());
+ let actual = execute(&mut ctx, sql).await;
+
+ let expected = vec![
+ vec!["11", "a", "z"],
+ vec!["22", "b", "y"],
+ vec!["33", "c", "NULL"],
+ vec!["44", "d", "NULL"],
+ ];
+ assert_eq!(expected, actual);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn equijoin_right_and_condition_from_left() -> Result<()> {
+ let mut ctx = create_join_context("t1_id", "t2_id")?;
+ let sql =
+ "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name";
+ let res = ctx.create_logical_plan(sql);
+ assert!(res.is_ok());
+ let actual = execute(&mut ctx, sql).await;
+
+ let expected = vec![
+ vec!["NULL", "NULL", "w"],
+ vec!["44", "d", "x"],
+ vec!["22", "b", "y"],
+ vec!["NULL", "NULL", "z"],
+ ];
+ assert_eq!(expected, actual);
+
+ Ok(())
+}
+
+#[tokio::test]
async fn equijoin_and_unsupported_condition() -> Result<()> {
let ctx = create_join_context("t1_id", "t2_id")?;
let sql =
- "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+ "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= '44' ORDER BY t1_id";
let res = ctx.create_logical_plan(sql);
+
assert!(res.is_err());
- assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t2_name GtEq Utf8(\"y\")]");
+ assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id GtEq Utf8(\"44\")]");
+
Ok(())
}