You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/28 07:16:59 UTC
[arrow-datafusion] branch master updated: JOIN conditions are order
dependent (#778)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5416341 JOIN conditions are order dependent (#778)
5416341 is described below
commit 54163410da05e8e6c68af55d699bf6a89e229bb6
Author: Mike Seddon <se...@gmail.com>
AuthorDate: Wed Jul 28 17:16:52 2021 +1000
JOIN conditions are order dependent (#778)
* allow either order joins
* refactor to individual condition level
* change join signature to 'join_keys' tuple
---
.../rust/core/src/serde/logical_plan/from_proto.rs | 3 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 2 +-
datafusion/src/execution/dataframe_impl.rs | 3 +-
datafusion/src/logical_plan/builder.rs | 36 ++++++---
datafusion/src/optimizer/filter_push_down.rs | 9 +--
datafusion/src/optimizer/projection_push_down.rs | 4 +-
datafusion/src/sql/planner.rs | 9 ++-
datafusion/tests/sql.rs | 90 +++++++++++++++++-----
8 files changed, 108 insertions(+), 48 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index cad0543..38b5257 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -272,8 +272,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
JoinConstraint::On => builder.join(
&convert_box_required!(join.right)?,
join_type.into(),
- left_keys,
- right_keys,
+ (left_keys, right_keys),
)?,
JoinConstraint::Using => builder.join_using(
&convert_box_required!(join.right)?,
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 0d27c58..f6dbeaf 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -701,7 +701,7 @@ mod roundtrip_tests {
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![0, 3, 4]),
)
- .and_then(|plan| plan.join(&scan_plan, JoinType::Inner, vec!["id"], vec!["id"]))
+ .and_then(|plan| plan.join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"])))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index 4edd01c..451c4c7 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -117,8 +117,7 @@ impl DataFrame for DataFrameImpl {
.join(
&right.to_logical_plan(),
join_type,
- left_cols.to_vec(),
- right_cols.to_vec(),
+ (left_cols.to_vec(), right_cols.to_vec()),
)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 60e0ed3..a742f34 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -273,23 +273,37 @@ impl LogicalPlanBuilder {
&self,
right: &LogicalPlan,
join_type: JoinType,
- left_keys: Vec<impl Into<Column>>,
- right_keys: Vec<impl Into<Column>>,
+ join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
) -> Result<Self> {
- if left_keys.len() != right_keys.len() {
+ if join_keys.0.len() != join_keys.1.len() {
return Err(DataFusionError::Plan(
"left_keys and right_keys were not the same length".to_string(),
));
}
- let left_keys: Vec<Column> = left_keys
- .into_iter()
- .map(|c| c.into().normalize(&self.plan))
- .collect::<Result<_>>()?;
- let right_keys: Vec<Column> = right_keys
- .into_iter()
- .map(|c| c.into().normalize(right))
- .collect::<Result<_>>()?;
+ let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
+ join_keys
+ .0
+ .into_iter()
+ .zip(join_keys.1.into_iter())
+ .map(|(l, r)| {
+ let mut swap = false;
+ let l = l.into();
+ let left_key = l.clone().normalize(&self.plan).or_else(|_| {
+ swap = true;
+ l.normalize(right)
+ });
+ if swap {
+ (r.into().normalize(&self.plan), left_key)
+ } else {
+ (left_key, r.into().normalize(right))
+ }
+ })
+ .unzip();
+
+ let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
+ let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
+
let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 399923e..039e92d 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -973,8 +973,7 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a")],
- vec![Column::from_name("a")],
+ (vec![Column::from_name("a")], vec![Column::from_name("a")]),
)?
.filter(col("a").lt_eq(lit(1i64)))?
.build()?;
@@ -1058,8 +1057,7 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a")],
- vec![Column::from_name("a")],
+ (vec![Column::from_name("a")], vec![Column::from_name("a")]),
)?
// "b" and "c" are not shared by either side: they are only available together after the join
.filter(col("c").lt_eq(col("b")))?
@@ -1099,8 +1097,7 @@ mod tests {
.join(
&right,
JoinType::Inner,
- vec![Column::from_name("a")],
- vec![Column::from_name("a")],
+ (vec![Column::from_name("a")], vec![Column::from_name("a")]),
)?
.filter(col("b").lt_eq(lit(1i64)))?
.build()?;
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 0de36f3..96c5094 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -555,7 +555,7 @@ mod tests {
LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .join(&table2_scan, JoinType::Left, vec!["a"], vec!["c1"])?
+ .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))?
.project(vec![col("a"), col("b"), col("c1")])?
.build()?;
@@ -594,7 +594,7 @@ mod tests {
LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .join(&table2_scan, JoinType::Left, vec!["a"], vec!["c1"])?
+ .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))?
// projecting joined column `a` should push the right side column `c1` projection as
// well into test2 table even though `c1` is not referenced in projection.
.project(vec![col("a"), col("b")])?
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index fa2b035..6d9484b 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -375,8 +375,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
keys.into_iter().unzip();
// return the logical plan representing the join
- let join = LogicalPlanBuilder::from(left)
- .join(right, join_type, left_keys, right_keys)?;
+ let join = LogicalPlanBuilder::from(left).join(
+ right,
+ join_type,
+ (left_keys, right_keys),
+ )?;
if filter.is_empty() {
join.build()
@@ -548,7 +551,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
join_keys.iter().map(|(_, r)| r.clone()).collect();
let builder = LogicalPlanBuilder::from(left);
left = builder
- .join(right, JoinType::Inner, left_keys, right_keys)?
+ .join(right, JoinType::Inner, (left_keys, right_keys))?
.build()?;
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index d9f7c6e..bfe2f2f 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1717,15 +1717,40 @@ fn create_case_context() -> Result<ExecutionContext> {
#[tokio::test]
async fn equijoin() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
- let sql =
- "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+ ];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
- assert_eq!(expected, actual);
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
+ Ok(())
+}
+
+#[tokio::test]
+async fn equijoin_multiple_condition_ordering() -> Result<()> {
+ let mut ctx = create_join_context("t1_id", "t2_id")?;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t1_name <> t2_name ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name <> t1_name ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t1_name <> t2_name ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t2_id = t1_id AND t2_name <> t1_name ORDER BY t1_id",
+ ];
+ let expected = vec![
+ vec!["11", "a", "z"],
+ vec!["22", "b", "y"],
+ vec!["44", "d", "x"],
+ ];
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
Ok(())
}
@@ -1754,39 +1779,50 @@ async fn equijoin_and_unsupported_condition() -> Result<()> {
#[tokio::test]
async fn left_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
- let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+ ];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["33", "c", "NULL"],
vec!["44", "d", "x"],
];
- assert_eq!(expected, actual);
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
Ok(())
}
#[tokio::test]
async fn right_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
- let sql =
- "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t2_id = t1_id ORDER BY t1_id"
+ ];
let expected = vec![
vec!["NULL", "NULL", "w"],
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
- assert_eq!(expected, actual);
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
Ok(())
}
#[tokio::test]
async fn full_join() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
- let sql = "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 FULL JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+ ];
let expected = vec![
vec!["NULL", "NULL", "w"],
vec!["11", "a", "z"],
@@ -1794,11 +1830,19 @@ async fn full_join() -> Result<()> {
vec!["33", "c", "NULL"],
vec!["44", "d", "x"],
];
- assert_eq!(expected, actual);
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
- let sql = "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
- assert_eq!(expected, actual);
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1 FULL OUTER JOIN t2 ON t2_id = t1_id ORDER BY t1_id",
+ ];
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
Ok(())
}
@@ -1821,15 +1865,19 @@ async fn left_join_using() -> Result<()> {
#[tokio::test]
async fn equijoin_implicit_syntax() -> Result<()> {
let mut ctx = create_join_context("t1_id", "t2_id")?;
- let sql =
- "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id";
- let actual = execute(&mut ctx, sql).await;
+ let equivalent_sql = [
+ "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id",
+ "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id",
+ ];
let expected = vec![
vec!["11", "a", "z"],
vec!["22", "b", "y"],
vec!["44", "d", "x"],
];
- assert_eq!(expected, actual);
+ for sql in equivalent_sql.iter() {
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ }
Ok(())
}