You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/27 21:51:26 UTC
[arrow-datafusion] branch master updated: Minor: use a common method to check the validate of equijoin predicate (#4739)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b686b68ba Minor: use a common method to check the validate of equijoin predicate (#4739)
b686b68ba is described below
commit b686b68baeee964a5b797caf49484c01f0ae5b54
Author: ygf11 <ya...@gmail.com>
AuthorDate: Wed Dec 28 05:51:21 2022 +0800
Minor: use a common method to check the validate of equijoin predicate (#4739)
* Minor: use a common method to find the validate of equijoin predicate
* make check_all_column_from_schema private
* resolve merge conflict
---
datafusion/expr/src/logical_plan/builder.rs | 44 +++++-------------
datafusion/expr/src/utils.rs | 53 +++++++++++++++++++---
datafusion/optimizer/src/eliminate_cross_join.rs | 43 +++++-------------
.../optimizer/src/extract_equijoin_predicate.rs | 42 +++--------------
4 files changed, 74 insertions(+), 108 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index eeb3215c4..6bb3cf80e 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -32,8 +32,8 @@ use crate::{
Union, Values, Window,
},
utils::{
- can_hash, check_all_column_from_schema, expand_qualified_wildcard,
- expand_wildcard, group_window_expr_by_sort_keys,
+ can_hash, expand_qualified_wildcard, expand_wildcard,
+ find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
},
Expr, ExprSchemable, TableSource,
};
@@ -853,39 +853,17 @@ impl LogicalPlanBuilder {
&[right_using_columns],
)?;
- let normalized_left_using_columns = normalized_left_key.to_columns()?;
- let l_is_left = check_all_column_from_schema(
- &normalized_left_using_columns,
- self.plan.schema().clone(),
- )?;
-
- let normalized_right_using_columns = normalized_right_key.to_columns()?;
- let r_is_right = check_all_column_from_schema(
- &normalized_right_using_columns,
- right.schema().clone(),
- )?;
-
- let r_is_left_and_l_is_right = || {
- let result = check_all_column_from_schema(
- &normalized_right_using_columns,
+ // find valid equijoin
+ find_valid_equijoin_key_pair(
+ &normalized_left_key,
+ &normalized_right_key,
self.plan.schema().clone(),
- )? && check_all_column_from_schema(
- &normalized_left_using_columns,
right.schema().clone(),
- )?;
- Result::Ok(result)
- };
-
- if l_is_left && r_is_right {
- Ok((normalized_left_key, normalized_right_key))
- } else if r_is_left_and_l_is_right()?{
- Ok((normalized_right_key, normalized_left_key))
- } else {
- Err(DataFusionError::Plan(format!(
- "can't create join plan, join key should belong to one input, error key: ({},{})",
- normalized_left_key, normalized_right_key
- )))
- }
+ )?.ok_or_else(||
+ DataFusionError::Plan(format!(
+ "can't create join plan, join key should belong to one input, error key: ({},{})",
+ normalized_left_key, normalized_right_key
+ )))
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index ca06dfdb4..a3b240e65 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -953,15 +953,54 @@ pub fn can_hash(data_type: &DataType) -> bool {
}
/// Check whether all columns are from the schema.
-pub fn check_all_column_from_schema(
- columns: &HashSet<Column>,
- schema: DFSchemaRef,
-) -> Result<bool> {
- let result = columns
+fn check_all_column_from_schema(columns: &HashSet<Column>, schema: DFSchemaRef) -> bool {
+ columns
.iter()
- .all(|column| schema.index_of_column(column).is_ok());
+ .all(|column| schema.index_of_column(column).is_ok())
+}
- Ok(result)
+/// Give two sides of the equijoin predicate, return a valid join key pair.
+/// If there is no valid join key pair, return None.
+///
+/// A valid join means:
+/// 1. All referenced column of the left side is from the left schema, and
+/// all referenced column of the right side is from the right schema.
+/// 2. Or opposite. All referenced column of the left side is from the right schema,
+/// and the right side is from the left schema.
+///
+pub fn find_valid_equijoin_key_pair(
+ left_key: &Expr,
+ right_key: &Expr,
+ left_schema: DFSchemaRef,
+ right_schema: DFSchemaRef,
+) -> Result<Option<(Expr, Expr)>> {
+ let left_using_columns = left_key.to_columns()?;
+ let right_using_columns = right_key.to_columns()?;
+
+ // Conditions like a = 10, will be added to non-equijoin.
+ if left_using_columns.is_empty() || right_using_columns.is_empty() {
+ return Ok(None);
+ }
+
+ let l_is_left =
+ check_all_column_from_schema(&left_using_columns, left_schema.clone());
+ let r_is_right =
+ check_all_column_from_schema(&right_using_columns, right_schema.clone());
+
+ let r_is_left_and_l_is_right = || {
+ check_all_column_from_schema(&right_using_columns, left_schema.clone())
+ && check_all_column_from_schema(&left_using_columns, right_schema.clone())
+ };
+
+ let join_key_pair = match (l_is_left, r_is_right) {
+ (true, true) => Some((left_key.clone(), right_key.clone())),
+ (_, _) if r_is_left_and_l_is_right() => {
+ Some((right_key.clone(), left_key.clone()))
+ }
+ _ => None,
+ };
+
+ Ok(join_key_pair)
}
#[cfg(test)]
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index 04481f83d..28aae51d2 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -25,7 +25,7 @@ use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
CrossJoin, Filter, Join, JoinConstraint, JoinType, LogicalPlan, Projection,
};
-use datafusion_expr::utils::{can_hash, check_all_column_from_schema};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{and, build_join_schema, or, ExprSchemable, Operator};
#[derive(Default)]
@@ -183,42 +183,21 @@ fn find_inner_join(
let mut join_keys = vec![];
for (l, r) in &mut *possible_join_keys {
- let left_using_columns = l.to_columns()?;
- let right_using_columns = r.to_columns()?;
-
- // Conditions like a = 10, will be treated as filter.
- if left_using_columns.is_empty() || right_using_columns.is_empty() {
- continue;
- }
-
- let l_is_left = check_all_column_from_schema(
- &left_using_columns,
+ let key_pair = find_valid_equijoin_key_pair(
+ l,
+ r,
left_input.schema().clone(),
- )?;
- let r_is_right = check_all_column_from_schema(
- &right_using_columns,
right_input.schema().clone(),
)?;
- let r_is_left_and_l_is_right = || {
- let result = check_all_column_from_schema(
- &right_using_columns,
- left_input.schema().clone(),
- )? && check_all_column_from_schema(
- &left_using_columns,
- right_input.schema().clone(),
- )?;
-
- Result::Ok(result)
- };
-
// Save join keys
- if l_is_left && r_is_right && can_hash(&l.get_type(left_input.schema())?) {
- join_keys.push((l.clone(), r.clone()));
- } else if r_is_left_and_l_is_right()?
- && can_hash(&l.get_type(right_input.schema())?)
- {
- join_keys.push((r.clone(), l.clone()));
+ match key_pair {
+ Some((valid_l, valid_r)) => {
+ if can_hash(&valid_l.get_type(left_input.schema())?) {
+ join_keys.push((valid_l, valid_r));
+ }
+ }
+ _ => continue,
}
}
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index 214fbe728..060cd82f6 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -20,7 +20,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::DFSchema;
use datafusion_common::Result;
-use datafusion_expr::utils::{can_hash, check_all_column_from_schema};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
use std::sync::Arc;
@@ -124,46 +124,16 @@ fn extract_join_keys(
match &expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
Operator::Eq => {
- let left = *left.clone();
- let right = *right.clone();
- let left_using_columns = left.to_columns()?;
- let right_using_columns = right.to_columns()?;
-
- // When one side key does not contain columns, we need move this expression to filter.
- // For example: a = 1, a = now() + 10.
- if left_using_columns.is_empty() || right_using_columns.is_empty() {
- accum_filter.push(expr);
- return Ok(());
- }
+ let left = left.as_ref();
+ let right = right.as_ref();
- // Checking left join key is from left schema, right join key is from right schema, or the opposite.
- let l_is_left = check_all_column_from_schema(
- &left_using_columns,
+ let join_key_pair = find_valid_equijoin_key_pair(
+ left,
+ right,
left_schema.clone(),
- )?;
- let r_is_right = check_all_column_from_schema(
- &right_using_columns,
right_schema.clone(),
)?;
- let r_is_left_and_l_is_right = || {
- let result = check_all_column_from_schema(
- &right_using_columns,
- left_schema.clone(),
- )? && check_all_column_from_schema(
- &left_using_columns,
- right_schema.clone(),
- )?;
-
- Result::Ok(result)
- };
-
- let join_key_pair = match (l_is_left, r_is_right) {
- (true, true) => Some((left, right)),
- (_, _) if r_is_left_and_l_is_right()? => Some((right, left)),
- _ => None,
- };
-
if let Some((left_expr, right_expr)) = join_key_pair {
let left_expr_type = left_expr.get_type(left_schema)?;
let right_expr_type = right_expr.get_type(right_schema)?;