You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/27 21:51:26 UTC

[arrow-datafusion] branch master updated: Minor: use a common method to check the validate of equijoin predicate (#4739)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b686b68ba Minor: use a common method to check the validate of  equijoin predicate (#4739)
b686b68ba is described below

commit b686b68baeee964a5b797caf49484c01f0ae5b54
Author: ygf11 <ya...@gmail.com>
AuthorDate: Wed Dec 28 05:51:21 2022 +0800

    Minor: use a common method to check the validate of  equijoin predicate (#4739)
    
    * Minor: use a common method to find the validate of equijoin predicate
    
    * make check_all_column_from_schema private
    
    * resolve merge conflict
---
 datafusion/expr/src/logical_plan/builder.rs        | 44 +++++-------------
 datafusion/expr/src/utils.rs                       | 53 +++++++++++++++++++---
 datafusion/optimizer/src/eliminate_cross_join.rs   | 43 +++++-------------
 .../optimizer/src/extract_equijoin_predicate.rs    | 42 +++--------------
 4 files changed, 74 insertions(+), 108 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index eeb3215c4..6bb3cf80e 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -32,8 +32,8 @@ use crate::{
         Union, Values, Window,
     },
     utils::{
-        can_hash, check_all_column_from_schema, expand_qualified_wildcard,
-        expand_wildcard, group_window_expr_by_sort_keys,
+        can_hash, expand_qualified_wildcard, expand_wildcard,
+        find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
     },
     Expr, ExprSchemable, TableSource,
 };
@@ -853,39 +853,17 @@ impl LogicalPlanBuilder {
                     &[right_using_columns],
                 )?;
 
-                let normalized_left_using_columns = normalized_left_key.to_columns()?;
-                let l_is_left = check_all_column_from_schema(
-                    &normalized_left_using_columns,
-                    self.plan.schema().clone(),
-                )?;
-
-                let normalized_right_using_columns = normalized_right_key.to_columns()?;
-                let r_is_right = check_all_column_from_schema(
-                    &normalized_right_using_columns,
-                    right.schema().clone(),
-                )?;
-
-                let r_is_left_and_l_is_right = || {
-                    let result = check_all_column_from_schema(
-                        &normalized_right_using_columns,
+                // find valid equijoin
+                find_valid_equijoin_key_pair(
+                        &normalized_left_key,
+                        &normalized_right_key,
                         self.plan.schema().clone(),
-                    )? && check_all_column_from_schema(
-                        &normalized_left_using_columns,
                         right.schema().clone(),
-                    )?;
-                    Result::Ok(result)
-                };
-
-                if l_is_left && r_is_right {
-                    Ok((normalized_left_key, normalized_right_key))
-                } else if r_is_left_and_l_is_right()?{
-                    Ok((normalized_right_key, normalized_left_key))
-                } else {
-                    Err(DataFusionError::Plan(format!(
-                        "can't create join plan, join key should belong to one input, error key: ({},{})",
-                        normalized_left_key, normalized_right_key
-                    )))
-                }
+                    )?.ok_or_else(||
+                        DataFusionError::Plan(format!(
+                            "can't create join plan, join key should belong to one input, error key: ({},{})",
+                            normalized_left_key, normalized_right_key
+                        )))
             })
             .collect::<Result<Vec<_>>>()?;
 
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index ca06dfdb4..a3b240e65 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -953,15 +953,54 @@ pub fn can_hash(data_type: &DataType) -> bool {
 }
 
 /// Check whether all columns are from the schema.
-pub fn check_all_column_from_schema(
-    columns: &HashSet<Column>,
-    schema: DFSchemaRef,
-) -> Result<bool> {
-    let result = columns
+fn check_all_column_from_schema(columns: &HashSet<Column>, schema: DFSchemaRef) -> bool {
+    columns
         .iter()
-        .all(|column| schema.index_of_column(column).is_ok());
+        .all(|column| schema.index_of_column(column).is_ok())
+}
 
-    Ok(result)
+/// Give two sides of the equijoin predicate, return a valid join key pair.
+/// If there is no valid join key pair, return None.
+///
+/// A valid join means:
+/// 1. All referenced column of the left side is from the left schema, and
+///    all referenced column of the right side is from the right schema.
+/// 2. Or opposite. All referenced column of the left side is from the right schema,
+///    and the right side is from the left schema.
+///
+pub fn find_valid_equijoin_key_pair(
+    left_key: &Expr,
+    right_key: &Expr,
+    left_schema: DFSchemaRef,
+    right_schema: DFSchemaRef,
+) -> Result<Option<(Expr, Expr)>> {
+    let left_using_columns = left_key.to_columns()?;
+    let right_using_columns = right_key.to_columns()?;
+
+    // Conditions like a = 10, will be added to non-equijoin.
+    if left_using_columns.is_empty() || right_using_columns.is_empty() {
+        return Ok(None);
+    }
+
+    let l_is_left =
+        check_all_column_from_schema(&left_using_columns, left_schema.clone());
+    let r_is_right =
+        check_all_column_from_schema(&right_using_columns, right_schema.clone());
+
+    let r_is_left_and_l_is_right = || {
+        check_all_column_from_schema(&right_using_columns, left_schema.clone())
+            && check_all_column_from_schema(&left_using_columns, right_schema.clone())
+    };
+
+    let join_key_pair = match (l_is_left, r_is_right) {
+        (true, true) => Some((left_key.clone(), right_key.clone())),
+        (_, _) if r_is_left_and_l_is_right() => {
+            Some((right_key.clone(), left_key.clone()))
+        }
+        _ => None,
+    };
+
+    Ok(join_key_pair)
 }
 
 #[cfg(test)]
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index 04481f83d..28aae51d2 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -25,7 +25,7 @@ use datafusion_expr::expr::{BinaryExpr, Expr};
 use datafusion_expr::logical_plan::{
     CrossJoin, Filter, Join, JoinConstraint, JoinType, LogicalPlan, Projection,
 };
-use datafusion_expr::utils::{can_hash, check_all_column_from_schema};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
 use datafusion_expr::{and, build_join_schema, or, ExprSchemable, Operator};
 
 #[derive(Default)]
@@ -183,42 +183,21 @@ fn find_inner_join(
         let mut join_keys = vec![];
 
         for (l, r) in &mut *possible_join_keys {
-            let left_using_columns = l.to_columns()?;
-            let right_using_columns = r.to_columns()?;
-
-            // Conditions like a = 10, will be treated as filter.
-            if left_using_columns.is_empty() || right_using_columns.is_empty() {
-                continue;
-            }
-
-            let l_is_left = check_all_column_from_schema(
-                &left_using_columns,
+            let key_pair = find_valid_equijoin_key_pair(
+                l,
+                r,
                 left_input.schema().clone(),
-            )?;
-            let r_is_right = check_all_column_from_schema(
-                &right_using_columns,
                 right_input.schema().clone(),
             )?;
 
-            let r_is_left_and_l_is_right = || {
-                let result = check_all_column_from_schema(
-                    &right_using_columns,
-                    left_input.schema().clone(),
-                )? && check_all_column_from_schema(
-                    &left_using_columns,
-                    right_input.schema().clone(),
-                )?;
-
-                Result::Ok(result)
-            };
-
             // Save join keys
-            if l_is_left && r_is_right && can_hash(&l.get_type(left_input.schema())?) {
-                join_keys.push((l.clone(), r.clone()));
-            } else if r_is_left_and_l_is_right()?
-                && can_hash(&l.get_type(right_input.schema())?)
-            {
-                join_keys.push((r.clone(), l.clone()));
+            match key_pair {
+                Some((valid_l, valid_r)) => {
+                    if can_hash(&valid_l.get_type(left_input.schema())?) {
+                        join_keys.push((valid_l, valid_r));
+                    }
+                }
+                _ => continue,
             }
         }
 
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index 214fbe728..060cd82f6 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -20,7 +20,7 @@ use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::DFSchema;
 use datafusion_common::Result;
-use datafusion_expr::utils::{can_hash, check_all_column_from_schema};
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
 use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
 use std::sync::Arc;
 
@@ -124,46 +124,16 @@ fn extract_join_keys(
     match &expr {
         Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
             Operator::Eq => {
-                let left = *left.clone();
-                let right = *right.clone();
-                let left_using_columns = left.to_columns()?;
-                let right_using_columns = right.to_columns()?;
-
-                // When one side key does not contain columns, we need move this expression to filter.
-                // For example: a = 1, a = now() + 10.
-                if left_using_columns.is_empty() || right_using_columns.is_empty() {
-                    accum_filter.push(expr);
-                    return Ok(());
-                }
+                let left = left.as_ref();
+                let right = right.as_ref();
 
-                // Checking left join key is from left schema, right join key is from right schema, or the opposite.
-                let l_is_left = check_all_column_from_schema(
-                    &left_using_columns,
+                let join_key_pair = find_valid_equijoin_key_pair(
+                    left,
+                    right,
                     left_schema.clone(),
-                )?;
-                let r_is_right = check_all_column_from_schema(
-                    &right_using_columns,
                     right_schema.clone(),
                 )?;
 
-                let r_is_left_and_l_is_right = || {
-                    let result = check_all_column_from_schema(
-                        &right_using_columns,
-                        left_schema.clone(),
-                    )? && check_all_column_from_schema(
-                        &left_using_columns,
-                        right_schema.clone(),
-                    )?;
-
-                    Result::Ok(result)
-                };
-
-                let join_key_pair = match (l_is_left, r_is_right) {
-                    (true, true) => Some((left, right)),
-                    (_, _) if r_is_left_and_l_is_right()? => Some((right, left)),
-                    _ => None,
-                };
-
                 if let Some((left_expr, right_expr)) = join_key_pair {
                     let left_expr_type = left_expr.get_type(left_schema)?;
                     let right_expr_type = right_expr.get_type(right_schema)?;