You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/12 09:44:17 UTC

[GitHub] [arrow-datafusion] jackwener opened a new pull request, #4185: Reimplement `Eliminate cross join`

jackwener opened a new pull request, #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #4176.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   Reimplement `Eliminate cross join`
   
   # Are these changes tested?
   
   Test `Reorder join` for eliminating cross join.
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025978614


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    LogicalPlan::CrossJoin(_) => {
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    _ => {
+                        return utils::optimize_children(self, plan, _optimizer_config);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
+
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while !all_inputs.is_empty() {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                left = utils::optimize_children(self, &left, _optimizer_config)?;
+                if plan.schema() != left.schema() {
+                    left = LogicalPlan::Projection(Projection::new_from_schema(
+                        Arc::new(left.clone()),
+                        plan.schema().clone(),
+                        None,
+                    ));
+                }
+
+                // if there are no join keys then do nothing.
+                if all_join_keys.is_empty() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate.clone(),
+                        Arc::new(left),
+                    )?))
+                } else {
+                    // remove join expressions from filter
+                    match remove_join_expressions(predicate, &all_join_keys)? {
+                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?)),
+                        _ => Ok(left),
+                    }
+                }
+            }
+
+            _ => utils::optimize_children(self, plan, _optimizer_config),
+        }
     }
 
     fn name(&self) -> &str {
         "reduce_cross_join"
     }
 }
 
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
-    _optimizer: &ReduceCrossJoin,
+fn flatten_join_inputs(
     plan: &LogicalPlan,
     possible_join_keys: &mut Vec<(Column, Column)>,
-    all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            // join keys are handled locally
-            let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
-            let mut new_all_join_keys = HashSet::new();
-
-            extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
-            let new_plan = reduce_cross_join(
-                _optimizer,
-                input,
-                &mut new_possible_join_keys,
-                &mut new_all_join_keys,
-            )?;
-
-            // if there are no join keys then do nothing.
-            if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate.clone(),
-                    Arc::new(new_plan),
-                )?))
-            } else {
-                // remove join expressions from filter
-                match remove_join_expressions(predicate, &new_all_join_keys)? {
-                    Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
-                        filter_expr,
-                        Arc::new(new_plan),
-                    )?)),
-                    _ => Ok(new_plan),
-                }
+    all_inputs: &mut Vec<LogicalPlan>,
+) -> Result<()> {
+    let children = match plan {
+        LogicalPlan::Join(join) => {
+            for join_keys in join.on.iter() {
+                possible_join_keys.push(join_keys.clone());
             }
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
         }
-        LogicalPlan::CrossJoin(cross_join) => {
-            let left_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.left,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            let right_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.right,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            // can we find a match?
-            let left_schema = left_plan.schema();
-            let right_schema = right_plan.schema();
-            let mut join_keys = vec![];
-
-            for (l, r) in possible_join_keys {
-                if left_schema.field_from_column(l).is_ok()
-                    && right_schema.field_from_column(r).is_ok()
-                    && can_hash(left_schema.field_from_column(l).unwrap().data_type())
-                {
-                    join_keys.push((l.clone(), r.clone()));
-                } else if left_schema.field_from_column(r).is_ok()
-                    && right_schema.field_from_column(l).is_ok()
-                    && can_hash(left_schema.field_from_column(r).unwrap().data_type())
-                {
-                    join_keys.push((r.clone(), l.clone()));
+        LogicalPlan::CrossJoin(join) => {
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
+        }
+        _ => {
+            return Err(DataFusionError::Plan(
+                "flatten_join_inputs just can call join/cross_join".to_string(),
+            ));
+        }
+    }?;
+
+    for child in children.iter() {
+        match *child {
+            LogicalPlan::Join(left_join) => {
+                if left_join.join_type == JoinType::Inner {
+                    flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+                } else {
+                    all_inputs.push((*child).clone());
                 }
             }
+            LogicalPlan::CrossJoin(_) => {
+                flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+            }
+            _ => all_inputs.push((*child).clone()),
+        }
+    }
+    Ok(())
+}

Review Comment:
   Yes, I tried it past, but it's complex, I can do it in followup job



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020754829


##########
datafusion/sql/src/planner.rs:
##########
@@ -5135,15 +4951,15 @@ mod tests {
             AND person.state = p.state)";
 
         let expected = "Projection: person.id\
-        \n  Filter: EXISTS (<subquery>)\
+        \n  Filter: person.id = p.id AND EXISTS (<subquery>)\
         \n    Subquery:\
         \n      Projection: person.first_name\
-        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
-        \n          Inner Join: person.id = p2.id\
+        \n        Filter: person.id = p2.id AND person.last_name = p.last_name AND person.state = p.state\
+        \n          CrossJoin:\
         \n            TableScan: person\
         \n            SubqueryAlias: p2\
         \n              TableScan: person\
-        \n    Inner Join: person.id = p.id\
+        \n    CrossJoin:\
         \n      TableScan: person\

Review Comment:
   Expected
   It's just because it's just build plan without optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025978270


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }

Review Comment:
   Like it! fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020752597


##########
datafusion/sql/src/planner.rs:
##########
@@ -5135,15 +4951,15 @@ mod tests {
             AND person.state = p.state)";
 
         let expected = "Projection: person.id\
-        \n  Filter: EXISTS (<subquery>)\
+        \n  Filter: person.id = p.id AND EXISTS (<subquery>)\
         \n    Subquery:\
         \n      Projection: person.first_name\
-        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
-        \n          Inner Join: person.id = p2.id\
+        \n        Filter: person.id = p2.id AND person.last_name = p.last_name AND person.state = p.state\
+        \n          CrossJoin:\
         \n            TableScan: person\
         \n            SubqueryAlias: p2\
         \n              TableScan: person\
-        \n    Inner Join: person.id = p.id\
+        \n    CrossJoin:\
         \n      TableScan: person\

Review Comment:
   Some of the queries now have more cross joins? Is this expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1315198128

   @jackwener I will take a closer look Tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1322356078

   > > There is specific logic in `extract_possible_join_keys()` to handle and pull up common Exprs in the Or branches. But I 
   
   > Nice idea👍! It's a great future ticket.
   
   I agree it would be a good idea for a future ticket. There is also https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs which I think tries to do the same thing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025978614


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    LogicalPlan::CrossJoin(_) => {
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    _ => {
+                        return utils::optimize_children(self, plan, _optimizer_config);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
+
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while !all_inputs.is_empty() {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                left = utils::optimize_children(self, &left, _optimizer_config)?;
+                if plan.schema() != left.schema() {
+                    left = LogicalPlan::Projection(Projection::new_from_schema(
+                        Arc::new(left.clone()),
+                        plan.schema().clone(),
+                        None,
+                    ));
+                }
+
+                // if there are no join keys then do nothing.
+                if all_join_keys.is_empty() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate.clone(),
+                        Arc::new(left),
+                    )?))
+                } else {
+                    // remove join expressions from filter
+                    match remove_join_expressions(predicate, &all_join_keys)? {
+                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?)),
+                        _ => Ok(left),
+                    }
+                }
+            }
+
+            _ => utils::optimize_children(self, plan, _optimizer_config),
+        }
     }
 
     fn name(&self) -> &str {
         "reduce_cross_join"
     }
 }
 
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
-    _optimizer: &ReduceCrossJoin,
+fn flatten_join_inputs(
     plan: &LogicalPlan,
     possible_join_keys: &mut Vec<(Column, Column)>,
-    all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            // join keys are handled locally
-            let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
-            let mut new_all_join_keys = HashSet::new();
-
-            extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
-            let new_plan = reduce_cross_join(
-                _optimizer,
-                input,
-                &mut new_possible_join_keys,
-                &mut new_all_join_keys,
-            )?;
-
-            // if there are no join keys then do nothing.
-            if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate.clone(),
-                    Arc::new(new_plan),
-                )?))
-            } else {
-                // remove join expressions from filter
-                match remove_join_expressions(predicate, &new_all_join_keys)? {
-                    Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
-                        filter_expr,
-                        Arc::new(new_plan),
-                    )?)),
-                    _ => Ok(new_plan),
-                }
+    all_inputs: &mut Vec<LogicalPlan>,
+) -> Result<()> {
+    let children = match plan {
+        LogicalPlan::Join(join) => {
+            for join_keys in join.on.iter() {
+                possible_join_keys.push(join_keys.clone());
             }
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
         }
-        LogicalPlan::CrossJoin(cross_join) => {
-            let left_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.left,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            let right_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.right,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            // can we find a match?
-            let left_schema = left_plan.schema();
-            let right_schema = right_plan.schema();
-            let mut join_keys = vec![];
-
-            for (l, r) in possible_join_keys {
-                if left_schema.field_from_column(l).is_ok()
-                    && right_schema.field_from_column(r).is_ok()
-                    && can_hash(left_schema.field_from_column(l).unwrap().data_type())
-                {
-                    join_keys.push((l.clone(), r.clone()));
-                } else if left_schema.field_from_column(r).is_ok()
-                    && right_schema.field_from_column(l).is_ok()
-                    && can_hash(left_schema.field_from_column(r).unwrap().data_type())
-                {
-                    join_keys.push((r.clone(), l.clone()));
+        LogicalPlan::CrossJoin(join) => {
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
+        }
+        _ => {
+            return Err(DataFusionError::Plan(
+                "flatten_join_inputs just can call join/cross_join".to_string(),
+            ));
+        }
+    }?;
+
+    for child in children.iter() {
+        match *child {
+            LogicalPlan::Join(left_join) => {
+                if left_join.join_type == JoinType::Inner {
+                    flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+                } else {
+                    all_inputs.push((*child).clone());
                 }
             }
+            LogicalPlan::CrossJoin(_) => {
+                flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+            }
+            _ => all_inputs.push((*child).clone()),
+        }
+    }
+    Ok(())
+}

Review Comment:
   Yes, Agree with you.
   I tried it past, but it's complex, I can do it in followup job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1312440468

   This code inspired by [my PR for doris](https://github.com/apache/doris/pull/13353),  I simplify it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1321116425

   I plan to review this and other join related PRs tomorrow. I apologize for the delays. The join work is really neat, but it is not a high priority at the moment in IOx so I have had to prioritize other work higher and do join related
   
   I appreciate the help that @jackwener @mingmwang are giving each other in the review process. 🙏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1323634883

   fix new conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1318352325

   agree with you opinion. This problem exists in original code(because some code such as `extract join key` from original code).
   Other code also exist this problem, like I find in `projection push down` #4174 , This is a big scope problem.
   
   I also am caring some job about `Alias`. such as #4232.
   - remove table-ref-alias in projection
   - fix code which don't consider Alias  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1024983937


##########
datafusion/optimizer/tests/integration-test.rs:
##########
@@ -267,6 +267,42 @@ fn propagate_empty_relation() {
     assert_eq!(expected, format!("{:?}", plan));
 }
 
+#[test]
+fn join_keys_in_subquery_alias() {

Review Comment:
   Integration-test here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1312441814

   There is an important point about `Schema` need to discuss.
   
   We can change `Schema` into `set` instead of `list`. Because it would cause many projection just for order of field.
   
   Lots of projection will make our rule become complex, we should consider many case, especially for join reorder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1315790658

   I will try and review this in the next day or two -- but it may take me a while. Hopefully @mingmwang can help review as well 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1322377423

   > I took the liberty of merging master into the branch so that we avoid any possible logical conflicts.
   
   Pulled master into it. Thanks @alamb ❤️


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020741009


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,243 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let mut input = (**filter.input()).clone();
+
+                // optimize children.
+                input = self.optimize(&input, _optimizer_config)?;
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        collect_all_inputs_from_inner(
+                            join,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        );
+                    }
+                    LogicalPlan::CrossJoin(join) => {
+                        collect_all_inputs_from_cross(
+                            join,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        );
+                    }
+                    _ => {
+                        let new_exprs = plan.expressions();
+                        let new_inputs = [input];
+                        return from_plan(plan, &new_exprs, &new_inputs);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while all_inputs.len() > 0 {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
+
+                // TODO: it's need to be discussed for Schema.
+                // if plan.schema() != left.schema() {
+                //     left = LogicalPlan::Projection(Projection::new_from_schema(
+                //         Arc::new(plan.clone()),
+                //         plan.schema().clone(),
+                //     ));
+                // }

Review Comment:
   It's need to be discussed about `Schema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025946159


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }

Review Comment:
   Maybe you can change it to:
   ````
                   match &input {
                       LogicalPlan::Join(join) if (join.join_type == JoinType::Inner)=> {
                           flatten_join_inputs(
                               &input,
                               &mut possible_join_keys,
                               &mut all_inputs,
                           )?;
                       }
   
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020918310


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,235 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let mut input = (**filter.input()).clone();
+
+                // optimize children.
+                input = self.optimize(&input, _optimizer_config)?;
+

Review Comment:
   Should it call optimize children first ?  I think this rule should be a top-down(Preorder Traversal) optimization process. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1322354297

   I took the liberty of merging master into the branch so that we avoid any possible logical conflicts. 
   
   Thanks again for all the comments @mingmwang and @Dandandan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1023476969


##########
datafusion/sql/src/planner.rs:
##########
@@ -2854,41 +2741,6 @@ fn normalize_sql_object_name(sql_object_name: &ObjectName) -> String {
         .join(".")
 }
 
-/// Remove join expressions from a filter expression
-fn remove_join_expressions(
-    expr: &Expr,
-    join_columns: &HashSet<(Column, Column)>,
-) -> Result<Option<Expr>> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
-            Operator::Eq => match (left.as_ref(), right.as_ref()) {
-                (Expr::Column(l), Expr::Column(r)) => {
-                    if join_columns.contains(&(l.clone(), r.clone()))
-                        || join_columns.contains(&(r.clone(), l.clone()))
-                    {
-                        Ok(None)
-                    } else {
-                        Ok(Some(expr.clone()))
-                    }
-                }
-                _ => Ok(Some(expr.clone())),
-            },
-            Operator::And => {
-                let l = remove_join_expressions(left, join_columns)?;
-                let r = remove_join_expressions(right, join_columns)?;
-                match (l, r) {
-                    (Some(ll), Some(rr)) => Ok(Some(and(ll, rr))),
-                    (Some(ll), _) => Ok(Some(ll)),
-                    (_, Some(rr)) => Ok(Some(rr)),
-                    _ => Ok(None),
-                }
-            }
-            _ => Ok(Some(expr.clone())),
-        },
-        _ => Ok(Some(expr.clone())),
-    }

Review Comment:
   I don't know who revert these code, this originally include `handle OR`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1023477191


##########
datafusion/sql/src/planner.rs:
##########
@@ -2955,30 +2807,6 @@ fn extract_join_keys(
     }
 }
 
-/// Extract join keys from a WHERE clause
-fn extract_possible_join_keys(
-    expr: &Expr,
-    accum: &mut Vec<(Column, Column)>,
-) -> Result<()> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
-            Operator::Eq => match (left.as_ref(), right.as_ref()) {
-                (Expr::Column(l), Expr::Column(r)) => {
-                    accum.push((l.clone(), r.clone()));
-                    Ok(())
-                }
-                _ => Ok(()),
-            },
-            Operator::And => {
-                extract_possible_join_keys(left, accum)?;
-                extract_possible_join_keys(right, accum)
-            }
-            _ => Ok(()),
-        },
-        _ => Ok(()),
-    }

Review Comment:
   I don't know who revert these code, this originally include handle OR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1318338606

   @jackwener The optimization process LGTM. But the processing with Columns might be problematic and could introduce problems. One major problem is within the Column it self, it is not schema aware. So when you collecting Columns up/down the plan tree, in different layers of the plan, the Column with the same name and same type might have different meanings(because of Alias, SubQueryAlias).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1318359043

   Could you please add more test cases to Cover projection with alias, subquery Alias, to make sure the Alias and Columns are 
   handled correctly?
   
   
   `select ... from A, (select B.y3 as y1 from  A inner join on(A.x =  B.y2)  as b where (A.x = B.y1)`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1312461740

   cc @Dandandan @liukun4515 @andygrove @alamb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020913030


##########
datafusion/sql/src/planner.rs:
##########
@@ -857,166 +858,52 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         outer_query_schema: Option<&DFSchema>,
         ctes: &mut HashMap<String, LogicalPlan>,
     ) -> Result<LogicalPlan> {
+        let cross_join_plan = if plans.len() == 1 {
+            plans[0].clone()
+        } else {
+            let mut left = plans[0].clone();
+            for right in plans.iter().skip(1) {
+                left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
+            }
+            left
+        };
         match selection {
             Some(predicate_expr) => {
-                // build join schema
                 let mut fields = vec![];
-                let mut metadata = std::collections::HashMap::new();
+                let mut metadata = HashMap::new();
                 for plan in &plans {
                     fields.extend_from_slice(plan.schema().fields());
                     metadata.extend(plan.schema().metadata().clone());
                 }
+
                 let mut join_schema = DFSchema::new_with_metadata(fields, metadata)?;
+                let mut all_schemas: Vec<DFSchemaRef> = vec![];
+                for plan in plans {
+                    for schema in plan.all_schemas() {
+                        all_schemas.push(schema.clone());
+                    }
+                }
                 if let Some(outer) = outer_query_schema {
+                    all_schemas.push(Arc::new(outer.clone()));
                     join_schema.merge(outer);
                 }
+                let x: Vec<&DFSchemaRef> = all_schemas.iter().collect();
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    x.as_slice(),
+                    &[using_columns],
+                )?;
 
-                // look for expressions of the form `<column> = <column>`
-                let mut possible_join_keys = vec![];

Review Comment:
   Glad to see those logic is moved out from the planner. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020916128


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,235 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let mut input = (**filter.input()).clone();
+
+                // optimize children.
+                input = self.optimize(&input, _optimizer_config)?;
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        collect_all_inputs_from_inner(
+                            join,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        );
+                    }
+                    LogicalPlan::CrossJoin(join) => {
+                        collect_all_inputs_from_cross(
+                            join,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        );
+                    }
+                    _ => {
+                        let new_exprs = plan.expressions();
+                        let new_inputs = [input];
+                        return from_plan(plan, &new_exprs, &new_inputs);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while !all_inputs.is_empty() {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
+
+                // if there are no join keys then do nothing.
+                if all_join_keys.is_empty() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate.clone(),
+                        Arc::new(left),
+                    )?))
+                } else {
+                    // remove join expressions from filter
+                    match remove_join_expressions(predicate, &all_join_keys)? {
+                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?)),
+                        _ => Ok(left),
+                    }
+                }
+            }
+
+            _ => utils::optimize_children(self, plan, _optimizer_config),
+        }
     }
 
     fn name(&self) -> &str {
         "reduce_cross_join"
     }
 }
 
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
-    _optimizer: &ReduceCrossJoin,
-    plan: &LogicalPlan,
-    possible_join_keys: &mut Vec<(Column, Column)>,
-    all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            // join keys are handled locally
-            let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
-            let mut new_all_join_keys = HashSet::new();
-
-            extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
-            let new_plan = reduce_cross_join(
-                _optimizer,
-                input,
-                &mut new_possible_join_keys,
-                &mut new_all_join_keys,
-            )?;
-
-            // if there are no join keys then do nothing.
-            if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate.clone(),
-                    Arc::new(new_plan),
-                )?))
+fn collect_all_inputs_from_inner(
+    join: &Join,
+    all_join_keys: &mut Vec<(Column, Column)>,

Review Comment:
   It seems `collect_all_inputs_from_inner()` and  `collect_all_inputs_from_cross() ` have similar structure, can we combine the two methods to one ? 
   Maybe we can name that `flatten_join_inputs()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1319502555

   There is specific logic in `extract_possible_join_keys()` to handle and pull up common Exprs in the Or branches.
   But I think this logic should be handled in a common rule like `ExprSimplifier`.
   @alamb 
   Once we reach the other rules, Exprs in Filters or Join conditions should be the already simplified form.
   
   ````
   (A.a = B.b and B.x = xxx) or (A.a = B.b and B.y = yyy)
   ````
   To
   ````
   (A.a = B.b) and (B.x = xxx or B.y = yyy)
   ````
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1028287950


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -849,14 +955,14 @@ mod tests {
 
         let expected = vec![
             "Filter: (t4.c < UInt32(15) OR t4.c = UInt32(688)) AND (t4.c < UInt32(15) OR t3.c = UInt32(688) OR t3.b = t4.b) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
-            "  Inner Join: t1.a = t3.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",

Review Comment:
   nice



##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,202 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) if (join.join_type == JoinType::Inner) => {
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    LogicalPlan::CrossJoin(_) => {
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    _ => {
+                        return utils::optimize_children(self, plan, _optimizer_config);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
+
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while !all_inputs.is_empty() {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                left = utils::optimize_children(self, &left, _optimizer_config)?;
+                if plan.schema() != left.schema() {
+                    left = LogicalPlan::Projection(Projection::new_from_schema(
+                        Arc::new(left.clone()),
+                        plan.schema().clone(),
+                        None,
+                    ));
+                }
+
+                // if there are no join keys then do nothing.
+                if all_join_keys.is_empty() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate.clone(),
+                        Arc::new(left),
+                    )?))
+                } else {
+                    // remove join expressions from filter
+                    match remove_join_expressions(predicate, &all_join_keys)? {
+                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?)),
+                        _ => Ok(left),
+                    }
+                }
+            }
+
+            _ => utils::optimize_children(self, plan, _optimizer_config),
+        }
     }
 
     fn name(&self) -> &str {
         "reduce_cross_join"
     }
 }
 
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
-    _optimizer: &ReduceCrossJoin,
+fn flatten_join_inputs(
     plan: &LogicalPlan,
     possible_join_keys: &mut Vec<(Column, Column)>,
-    all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            // join keys are handled locally
-            let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
-            let mut new_all_join_keys = HashSet::new();
-
-            extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
-            let new_plan = reduce_cross_join(
-                _optimizer,
-                input,
-                &mut new_possible_join_keys,
-                &mut new_all_join_keys,
-            )?;
-
-            // if there are no join keys then do nothing.
-            if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate.clone(),
-                    Arc::new(new_plan),
-                )?))
-            } else {
-                // remove join expressions from filter
-                match remove_join_expressions(predicate, &new_all_join_keys)? {
-                    Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
-                        filter_expr,
-                        Arc::new(new_plan),
-                    )?)),
-                    _ => Ok(new_plan),
-                }
+    all_inputs: &mut Vec<LogicalPlan>,
+) -> Result<()> {
+    let children = match plan {
+        LogicalPlan::Join(join) => {
+            for join_keys in join.on.iter() {
+                possible_join_keys.push(join_keys.clone());
             }
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
         }
-        LogicalPlan::CrossJoin(cross_join) => {
-            let left_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.left,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            let right_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.right,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            // can we find a match?
-            let left_schema = left_plan.schema();
-            let right_schema = right_plan.schema();
-            let mut join_keys = vec![];
-
-            for (l, r) in possible_join_keys {
-                if left_schema.field_from_column(l).is_ok()
-                    && right_schema.field_from_column(r).is_ok()
-                    && can_hash(left_schema.field_from_column(l).unwrap().data_type())
-                {
-                    join_keys.push((l.clone(), r.clone()));
-                } else if left_schema.field_from_column(r).is_ok()
-                    && right_schema.field_from_column(l).is_ok()
-                    && can_hash(left_schema.field_from_column(r).unwrap().data_type())
-                {
-                    join_keys.push((r.clone(), l.clone()));
+        LogicalPlan::CrossJoin(join) => {
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
+        }
+        _ => {
+            return Err(DataFusionError::Plan(
+                "flatten_join_inputs just can call join/cross_join".to_string(),
+            ));
+        }
+    }?;
+
+    for child in children.iter() {
+        match *child {
+            LogicalPlan::Join(left_join) => {
+                if left_join.join_type == JoinType::Inner {
+                    flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+                } else {
+                    all_inputs.push((*child).clone());
                 }
             }
+            LogicalPlan::CrossJoin(_) => {
+                flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+            }
+            _ => all_inputs.push((*child).clone()),

Review Comment:
   eventually it would be awesome to avoid so much cloneing -- maybe as a follow on PR



##########
datafusion/sql/src/planner.rs:
##########
@@ -2955,30 +2807,6 @@ fn extract_join_keys(
     }
 }
 
-/// Extract join keys from a WHERE clause
-fn extract_possible_join_keys(
-    expr: &Expr,
-    accum: &mut Vec<(Column, Column)>,
-) -> Result<()> {
-    match expr {
-        Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
-            Operator::Eq => match (left.as_ref(), right.as_ref()) {
-                (Expr::Column(l), Expr::Column(r)) => {
-                    accum.push((l.clone(), r.clone()));
-                    Ok(())
-                }
-                _ => Ok(()),
-            },
-            Operator::And => {
-                extract_possible_join_keys(left, accum)?;
-                extract_possible_join_keys(right, accum)
-            }
-            _ => Ok(()),
-        },
-        _ => Ok(()),
-    }

Review Comment:
   Maybe you are thinking about this copy: https://github.com/apache/arrow-datafusion/blob/bcd624855778384ee27648161de73951e3fb6ea1/datafusion/optimizer/src/reduce_cross_join.rs#L238-L279



##########
benchmarks/expected-plans/q2.txt:
##########
@@ -1,24 +1,25 @@
 Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
   Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
-    Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value
-      Inner Join: nation.n_regionkey = region.r_regionkey
-        Inner Join: supplier.s_nationkey = nation.n_nationkey
-          Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-            Inner Join: part.p_partkey = partsupp.ps_partkey
-              Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
-                TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size]
-              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
-            TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
-          TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
-        Filter: region.r_name = Utf8("EUROPE")
-          TableScan: region projection=[r_regionkey, r_name]
-      Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1
-        Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
-          Inner Join: nation.n_regionkey = region.r_regionkey
-            Inner Join: supplier.s_nationkey = nation.n_nationkey
-              Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+    Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name

Review Comment:
    I found the difference much easier to see with whitespace blind diff
   
   https://github.com/apache/arrow-datafusion/pull/4185/files?w=1
   
   I found github's rendering of this really hard to understand the change in plan -- I drew out the join graphs by hand to make sure they were the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1322356357

   cc @xudong963 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1324325296

   Benchmark runs are scheduled for baseline = bfce076527e40e51357b3815ed367a28e8b73b3a and contender = d355f69aae2cc951cfd021e5c0b690861ba0c4ac. d355f69aae2cc951cfd021e5c0b690861ba0c4ac is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/a95e3da0b6c546e5b346cefe6ce2e366...7c7e550bb2ec46cb869b05245af55fc8/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/255da1432cff4a5c8dec0d8a5d6a93cc...be06e78e27384c9f8511163961e47bb1/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/4df4d81602e742f6b8d5c467f97c8dfe...5f530d6efd5045f48aa451dd64b30ea8/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/2498e32b58b545c79a022a76ac9588f6...be7fef3816474dc3a600755d7e14964e/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020912677


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1145,6 +1145,22 @@ impl Projection {
         })
     }
 
+    /// Create a new Projection using the specified output schema
+    pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
+        let expr: Vec<Expr> = schema

Review Comment:
   Do not see a place this method is used. I think if we have logic to reorder joins, we might need to add an additional Projection if the output schema is not the same with the original plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1315189484

   Thanks @mingmwang review !❤️
   cc @liukun4515 @Dandandan @andygrove @alamb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1312954329

   Thanks @mingmwang 👍, I will fix them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1312759047

   > There is an important point about `Schema` need to discuss.
   > 
   > We can change `Schema` into `set` instead of `list`. Because it would cause many projection just for order of field.
   > 
   > such as
   > 
   > ```
   > a b -> proj(join(b a))
   > a b c ->  join(proj(join(b a)) c) -> project(join(c, project (join(b a))))
   > ```
   > 
   > Lots of projection will make our rule become complex, we should consider many case, especially for join reorder.
   > 
   > A simple but efficient way is override `==`
   
   I think it is because you make the rule run a bottom-up process, then lots of projections are added. If we make it a top-down
   process, only a final project will be added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
xudong963 commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1322861964

   > > > There is specific logic in `extract_possible_join_keys()` to handle and pull up common Exprs in the Or branches. But I
   > 
   > > Nice idea👍! It's a great future ticket.
   > 
   > I agree it would be a good idea for a future ticket. There is also https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs which I think tries to do the same thing
   
   Yes, thanks for your mention :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1324316962

   > Happy to see the logical moved from planner to optimizer, nice clean.
   
   > Maybe @DhamoPS would like to take a look, I remember he has worked similar work!
   
   Good call @xudong963  -- I am going to merge this PR now as it has been outstanding for a long time. Thanks again @jackwener  the recent contributions are awesome


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1318372059

   One possible solution in this rule is when you collect possible_join_keys , when you see a Projection, if the collected join keys from the children are not in the output of the projection, should remove them from the  possible_join_keys, because they are not visible to the top plan.
   
   And when you push down the top level filter conditions into Joins, need to respect the Alias and Projection and should tranform the name to name before the Projection.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1318374454

   I add a integration-test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025948373


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    LogicalPlan::CrossJoin(_) => {
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }
+                    _ => {
+                        return utils::optimize_children(self, plan, _optimizer_config);
+                    }
+                }
+
+                let predicate = filter.predicate();
+                // join keys are handled locally
+                let mut all_join_keys: HashSet<(Column, Column)> = HashSet::new();
+
+                extract_possible_join_keys(predicate, &mut possible_join_keys);
+
+                let mut left = all_inputs.remove(0);
+                while !all_inputs.is_empty() {
+                    left = find_inner_join(
+                        &left,
+                        &mut all_inputs,
+                        &mut possible_join_keys,
+                        &mut all_join_keys,
+                    )?;
+                }
 
-        reduce_cross_join(self, plan, &mut possible_join_keys, &mut all_join_keys)
+                left = utils::optimize_children(self, &left, _optimizer_config)?;
+                if plan.schema() != left.schema() {
+                    left = LogicalPlan::Projection(Projection::new_from_schema(
+                        Arc::new(left.clone()),
+                        plan.schema().clone(),
+                        None,
+                    ));
+                }
+
+                // if there are no join keys then do nothing.
+                if all_join_keys.is_empty() {
+                    Ok(LogicalPlan::Filter(Filter::try_new(
+                        predicate.clone(),
+                        Arc::new(left),
+                    )?))
+                } else {
+                    // remove join expressions from filter
+                    match remove_join_expressions(predicate, &all_join_keys)? {
+                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
+                            filter_expr,
+                            Arc::new(left),
+                        )?)),
+                        _ => Ok(left),
+                    }
+                }
+            }
+
+            _ => utils::optimize_children(self, plan, _optimizer_config),
+        }
     }
 
     fn name(&self) -> &str {
         "reduce_cross_join"
     }
 }
 
-/// Attempt to reduce cross joins to inner joins.
-/// for queries:
-/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
-/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
-/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
-/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
-/// For above queries, the join predicate is available in filters and they are moved to
-/// join nodes appropriately
-/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
-fn reduce_cross_join(
-    _optimizer: &ReduceCrossJoin,
+fn flatten_join_inputs(
     plan: &LogicalPlan,
     possible_join_keys: &mut Vec<(Column, Column)>,
-    all_join_keys: &mut HashSet<(Column, Column)>,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Filter(filter) => {
-            let input = filter.input();
-            let predicate = filter.predicate();
-            // join keys are handled locally
-            let mut new_possible_join_keys: Vec<(Column, Column)> = vec![];
-            let mut new_all_join_keys = HashSet::new();
-
-            extract_possible_join_keys(predicate, &mut new_possible_join_keys);
-
-            let new_plan = reduce_cross_join(
-                _optimizer,
-                input,
-                &mut new_possible_join_keys,
-                &mut new_all_join_keys,
-            )?;
-
-            // if there are no join keys then do nothing.
-            if new_all_join_keys.is_empty() {
-                Ok(LogicalPlan::Filter(Filter::try_new(
-                    predicate.clone(),
-                    Arc::new(new_plan),
-                )?))
-            } else {
-                // remove join expressions from filter
-                match remove_join_expressions(predicate, &new_all_join_keys)? {
-                    Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
-                        filter_expr,
-                        Arc::new(new_plan),
-                    )?)),
-                    _ => Ok(new_plan),
-                }
+    all_inputs: &mut Vec<LogicalPlan>,
+) -> Result<()> {
+    let children = match plan {
+        LogicalPlan::Join(join) => {
+            for join_keys in join.on.iter() {
+                possible_join_keys.push(join_keys.clone());
             }
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
         }
-        LogicalPlan::CrossJoin(cross_join) => {
-            let left_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.left,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            let right_plan = reduce_cross_join(
-                _optimizer,
-                &cross_join.right,
-                possible_join_keys,
-                all_join_keys,
-            )?;
-            // can we find a match?
-            let left_schema = left_plan.schema();
-            let right_schema = right_plan.schema();
-            let mut join_keys = vec![];
-
-            for (l, r) in possible_join_keys {
-                if left_schema.field_from_column(l).is_ok()
-                    && right_schema.field_from_column(r).is_ok()
-                    && can_hash(left_schema.field_from_column(l).unwrap().data_type())
-                {
-                    join_keys.push((l.clone(), r.clone()));
-                } else if left_schema.field_from_column(r).is_ok()
-                    && right_schema.field_from_column(l).is_ok()
-                    && can_hash(left_schema.field_from_column(r).unwrap().data_type())
-                {
-                    join_keys.push((r.clone(), l.clone()));
+        LogicalPlan::CrossJoin(join) => {
+            let left = &*(join.left);
+            let right = &*(join.right);
+            Ok::<Vec<&LogicalPlan>, DataFusionError>(vec![left, right])
+        }
+        _ => {
+            return Err(DataFusionError::Plan(
+                "flatten_join_inputs just can call join/cross_join".to_string(),
+            ));
+        }
+    }?;
+
+    for child in children.iter() {
+        match *child {
+            LogicalPlan::Join(left_join) => {
+                if left_join.join_type == JoinType::Inner {
+                    flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+                } else {
+                    all_inputs.push((*child).clone());
                 }
             }
+            LogicalPlan::CrossJoin(_) => {
+                flatten_join_inputs(child, possible_join_keys, all_inputs)?;
+            }
+            _ => all_inputs.push((*child).clone()),
+        }
+    }
+    Ok(())
+}

Review Comment:
   One case can be covered in flatten join process is the child is not just a Join(Inner or Cross),
   but a Join with Filter or Filter wrapped Join.  I think you can also cover the case in the following PR.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020912880


##########
datafusion/sql/src/planner.rs:
##########
@@ -857,166 +858,52 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         outer_query_schema: Option<&DFSchema>,
         ctes: &mut HashMap<String, LogicalPlan>,
     ) -> Result<LogicalPlan> {
+        let cross_join_plan = if plans.len() == 1 {
+            plans[0].clone()
+        } else {
+            let mut left = plans[0].clone();
+            for right in plans.iter().skip(1) {
+                left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
+            }
+            left
+        };
         match selection {
             Some(predicate_expr) => {
-                // build join schema
                 let mut fields = vec![];
-                let mut metadata = std::collections::HashMap::new();
+                let mut metadata = HashMap::new();
                 for plan in &plans {
                     fields.extend_from_slice(plan.schema().fields());
                     metadata.extend(plan.schema().metadata().clone());
                 }
+
                 let mut join_schema = DFSchema::new_with_metadata(fields, metadata)?;
+                let mut all_schemas: Vec<DFSchemaRef> = vec![];
+                for plan in plans {
+                    for schema in plan.all_schemas() {
+                        all_schemas.push(schema.clone());
+                    }
+                }
                 if let Some(outer) = outer_query_schema {
+                    all_schemas.push(Arc::new(outer.clone()));
                     join_schema.merge(outer);
                 }
+                let x: Vec<&DFSchemaRef> = all_schemas.iter().collect();
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
+                let mut using_columns = HashSet::new();
+                expr_to_columns(&filter_expr, &mut using_columns)?;
+                let filter_expr = normalize_col_with_schemas(
+                    filter_expr,
+                    x.as_slice(),
+                    &[using_columns],
+                )?;
 
-                // look for expressions of the form `<column> = <column>`
-                let mut possible_join_keys = vec![];

Review Comment:
   Glad to see those logic is moved out from the planner. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1025946159


##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -44,143 +44,209 @@ impl ReduceCrossJoin {
     }
 }
 
+/// Attempt to reorder join tp reduce cross joins to inner joins.
+/// for queries:
+/// 'select ... from a, b where a.x = b.y and b.xx = 100;'
+/// 'select ... from a, b where (a.x = b.y and b.xx = 100) or (a.x = b.y and b.xx = 200);'
+/// 'select ... from a, b, c where (a.x = b.y and b.xx = 100 and a.z = c.z)
+/// or (a.x = b.y and b.xx = 200 and a.z=c.z);'
+/// For above queries, the join predicate is available in filters and they are moved to
+/// join nodes appropriately
+/// This fix helps to improve the performance of TPCH Q19. issue#78
+///
 impl OptimizerRule for ReduceCrossJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        let mut possible_join_keys: Vec<(Column, Column)> = vec![];
-        let mut all_join_keys = HashSet::new();
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let input = (**filter.input()).clone();
+
+                let mut possible_join_keys: Vec<(Column, Column)> = vec![];
+                let mut all_inputs: Vec<LogicalPlan> = vec![];
+                match &input {
+                    LogicalPlan::Join(join) => {
+                        if join.join_type != JoinType::Inner {
+                            return utils::optimize_children(
+                                self,
+                                plan,
+                                _optimizer_config,
+                            );
+                        }
+                        flatten_join_inputs(
+                            &input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )?;
+                    }

Review Comment:
   Maybe you can change it to:
   ````
   
   match &input {
                       LogicalPlan::Join(join) if (join.join_type == JoinType::Inner)=> {
                           flatten_join_inputs(
                               &input,
                               &mut possible_join_keys,
                               &mut all_inputs,
                           )?;
                       }
   
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #4185: Add rule to reimplement `Eliminate cross join` and remove it in planner

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#issuecomment-1319533954

   > There is specific logic in `extract_possible_join_keys()` to handle and pull up common Exprs in the Or branches. But I think this logic should be handled in a common rule like `ExprSimplifier`. @alamb Once we reach the other rules, Exprs in Filters or Join conditions should be the already simplified form.
   > 
   > ```
   > (A.a = B.b and B.x = xxx) or (A.a = B.b and B.y = yyy)
   > ```
   > 
   > To
   > 
   > ```
   > (A.a = B.b) and (B.x = xxx or B.y = yyy)
   > ```
   
   Nice idea👍! It's a great future ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4185: Reimplement `Eliminate cross join`

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4185:
URL: https://github.com/apache/arrow-datafusion/pull/4185#discussion_r1020754829


##########
datafusion/sql/src/planner.rs:
##########
@@ -5135,15 +4951,15 @@ mod tests {
             AND person.state = p.state)";
 
         let expected = "Projection: person.id\
-        \n  Filter: EXISTS (<subquery>)\
+        \n  Filter: person.id = p.id AND EXISTS (<subquery>)\
         \n    Subquery:\
         \n      Projection: person.first_name\
-        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
-        \n          Inner Join: person.id = p2.id\
+        \n        Filter: person.id = p2.id AND person.last_name = p.last_name AND person.state = p.state\
+        \n          CrossJoin:\
         \n            TableScan: person\
         \n            SubqueryAlias: p2\
         \n              TableScan: person\
-        \n    Inner Join: person.id = p.id\
+        \n    CrossJoin:\
         \n      TableScan: person\

Review Comment:
   Expected
   It's just because it's just build plan without optimization.
   `quick_test ()` just invoke `logical_plan_with_dialect`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org