You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/05 08:42:10 UTC

[GitHub] [arrow-datafusion] jon-chuang opened a new pull request, #2451: Add exists subquery rewriting and correlated filters at filter depth 1

jon-chuang opened a new pull request, #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes: https://github.com/apache/arrow-datafusion/issues/2351
   Related: https://github.com/apache/arrow-datafusion/pull/2421
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   We perform a simple pattern match in the LogicalPlan optimizer on `InSubquery(Projection(Filter(..)))` and `Exists(Filter(..))`, extracting correlated/dependent columns to be used in the semi/anti join
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jon-chuang commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jon-chuang commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r865690206


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -348,11 +556,36 @@ mod tests {
         let expected = "Projection: #test.b [b:UInt32]\
         \n  Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: #sq.a [a:UInt32]\
-        \n      Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: #sq_nested.c [c:UInt32]\
-        \n          TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
+        \n    Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   Note: this is a quirk of the implementation. It removes projections below the semi join in some cases (which are inconsequential).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1121151973

   I have made it 1/2 way through this PR -- I will finish it up later today. Thanks again for your patience @jon-chuang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1119861819

   Thanks @jon-chuang  -- I will try and review this carefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jon-chuang commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jon-chuang commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r865696413


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -348,11 +556,36 @@ mod tests {
         let expected = "Projection: #test.b [b:UInt32]\
         \n  Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: #sq.a [a:UInt32]\
-        \n      Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: #sq_nested.c [c:UInt32]\
-        \n          TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
+        \n    Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   One way to remove the quirk is to allow for multi-column `InSubquery`s.
   
   The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:
   ```
   Project([col(a)])
     Filter(col(b)=(col(t.b))
   =>
   Filter(col(b)=(col(t.b))
     Project([col(a), col(b)])
   ```
   
   This way, one can move the Filter to the top of the subquery tree, just like with Exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jon-chuang commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jon-chuang commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r865692147


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, &column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   This may be somewhat restrictive. It can be expanded but it needs some work. In particular joins will need to accept expressions rather than just joining blindly on rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jon-chuang commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jon-chuang commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r865692147


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, &column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   This may be somewhat restrictive. ~~It can be expanded but it needs some work. In particular joins will need to accept expressions rather than just joining blindly on rows.~~
   
   Edit: Actually, its probably fine to have the following:
   ```
   Projection(col(a))
     Filter(col(b)=col(outer.b)
       Projection(alias(col(a).plus(col(c)), a), col(b))
   ```



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, &column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   This may be somewhat restrictive. ~~It can be expanded but it needs some work. In particular joins will need to accept expressions rather than just joining blindly on rows.~~
   
   Edit: Actually, its probably fine to have the following:
   ```
   Projection(col(a))
     Filter(col(b)=col(outer.b))
       Projection(alias(col(a).plus(col(c)), a), col(b))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1191665878

   @avantgardnerio noted that https://github.com/apache/arrow-datafusion/pull/2885 has some non trivial overlap with this PR, if you ever get back to it @jon-chuang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r884142592


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -64,105 +233,41 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 utils::split_conjunction(predicate, &mut filters);

Review Comment:
   `A or (B anc C)` can't be splited.
   
   I'm not sure that it will have a affact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1138376665

   @jon-chuang  do you have time to work on this PR (looks like it has accumulated some conflicts) and there are some suggestions about tests. If not, I can find some time to do a final polish and merge it in


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1120414361

   Sorry I haven't had a chance to review this PR yet @jon-chuang  -- I will put it on my queue and hopefully get to it shortly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r868056509


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))
+                        .and(col("table_a.b").eq(col("table_b.b"))),
+                ),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.c = #table_b.c, #table_a.a = #table_b.a, #table_a.b = #table_b.b [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_with_non_correlated_filter() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.a").eq(col("table_b.a")))
+                    .and(col("table_b.b").gt(lit("5"))),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .project(vec![col("a"), col("b")])?
+            .filter(exists(Arc::new(subquery)))?
+            .build()?;
+        let expected = "\
+            Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32]\
+            \n  Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n  Filter: #table_b.b > Utf8(\"5\") [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    // We only test not exists for the simplest case since all the other code paths

Review Comment:
   👍 
   
   I think there might be some subtleties involving NULLs (like if the subquery only has nulls and there is a non equality filter like `table_b.a > 5`



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -348,11 +556,36 @@ mod tests {
         let expected = "Projection: #test.b [b:UInt32]\
         \n  Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: #sq.a [a:UInt32]\
-        \n      Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: #sq_nested.c [c:UInt32]\
-        \n          TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
+        \n    Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   > The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:
   
   This sounds like a subset of the filter "pushdown" rule -- so maybe we can just invoke that again rather than adding a new special case rule



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))
+                        .and(col("table_a.b").eq(col("table_b.b"))),
+                ),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.c = #table_b.c, #table_a.a = #table_b.a, #table_a.b = #table_b.b [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_with_non_correlated_filter() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.a").eq(col("table_b.a")))
+                    .and(col("table_b.b").gt(lit("5"))),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .project(vec![col("a"), col("b")])?
+            .filter(exists(Arc::new(subquery)))?
+            .build()?;
+        let expected = "\
+            Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32]\
+            \n  Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n  Filter: #table_b.b > Utf8(\"5\") [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   I vaguely remember some corner cases related to these non equality predicates and SEMI joins, but when I tried to come up with some examples of issues with this plan I could not. 👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))

Review Comment:
   I suggest at least one predicate that has the subquery / table_b on the left -- like `table_b.a = table_a.` as all these predicates have `table_a` on the left



##########
datafusion/core/src/optimizer/utils.rs:
##########
@@ -574,23 +600,180 @@ pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>
     }
 }
 
-/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
-/// its predicate be all `predicates` ANDed.
-pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
-    // reduce filters to a single filter with an AND
-    let predicate = predicates
-        .iter()
-        .skip(1)
-        .fold(predicates[0].clone(), |acc, predicate| {
-            and(acc, (*predicate).to_owned())
-        });
-
-    LogicalPlan::Filter(Filter {
-        predicate,
-        input: Arc::new(plan),
-    })
+/// converts "A OR B OR C" => [A, B, C]
+pub fn split_disjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
+    match predicate {
+        Expr::BinaryExpr {
+            right,
+            op: Operator::Or,
+            left,
+        } => {
+            split_disjunction(left, predicates);
+            split_disjunction(right, predicates);
+        }
+        Expr::Alias(expr, _) => {
+            split_disjunction(expr, predicates);
+        }
+        other => predicates.push(other),
+    }
+}
+
+/// Converts [A, B, C] -> A AND B AND C
+pub fn combine_conjunctive(predicates: &[&Expr]) -> Option<Expr> {
+    if predicates.is_empty() {
+        None
+    } else {
+        // reduce filters to a single filter with an AND
+        Some(
+            predicates
+                .iter()
+                .skip(1)
+                .fold(predicates[0].clone(), |acc, predicate| {
+                    and(acc, (*predicate).to_owned())
+                }),
+        )
+    }
+}
+
+/// Converts [A, B, C] -> A OR B OR C
+pub fn combine_disjunctive(predicates: &[&Expr]) -> Option<Expr> {
+    if predicates.is_empty() {
+        None
+    } else {
+        // reduce filters to a single filter with an OR
+        Some(
+            predicates
+                .iter()
+                .skip(1)
+                .fold(predicates[0].clone(), |acc, predicate| {
+                    or(acc, (*predicate).to_owned())
+                }),
+        )
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+struct SubqueryVisitor<'a> {
+    contains_joinable_subquery: &'a mut bool,
+}
+
+impl ExpressionVisitor for SubqueryVisitor<'_> {
+    fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
+        match expr {
+            Expr::InSubquery { .. } | Expr::Exists { .. } => {
+                *self.contains_joinable_subquery = true;
+                return Ok(Recursion::Stop(self));
+            }
+            _ => {}
+        }
+        Ok(Recursion::Continue(self))
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+pub fn contains_joinable_subquery(expr: &Expr) -> Result<bool> {
+    let mut contains_joinable_subquery = false;
+    expr.accept(SubqueryVisitor {
+        contains_joinable_subquery: &mut contains_joinable_subquery,
+    })?;
+    Ok(contains_joinable_subquery)
+}
+
+/// Checks if the column belongs to the outer schema
+pub(crate) fn column_is_correlated(outer: &Arc<DFSchema>, column: &Column) -> bool {
+    for field in outer.fields() {
+        if *column == field.qualified_column() || *column == field.unqualified_column() {
+            return true;
+        }
+    }
+    false
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+struct CorrelatedColumnsVisitor<'a> {
+    outer_schema: &'a Arc<DFSchema>,
+    contains_correlated_columns: &'a mut bool,
+}
+
+impl ExpressionVisitor for CorrelatedColumnsVisitor<'_> {
+    fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
+        if let Expr::Column(c) = expr {
+            if column_is_correlated(self.outer_schema, c) {
+                *self.contains_correlated_columns = true;
+                return Ok(Recursion::Stop(self));
+            }
+        }
+        Ok(Recursion::Continue(self))
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a correlated column
+pub fn contains_correlated_columns(
+    outer_schema: &Arc<DFSchema>,
+    expr: &Expr,
+) -> Result<bool> {
+    let mut contains_correlated_columns = false;
+    expr.accept(CorrelatedColumnsVisitor {
+        outer_schema,
+        contains_correlated_columns: &mut contains_correlated_columns,
+    })?;
+    Ok(contains_correlated_columns)

Review Comment:
   I don't think it really matters, but this type of pattern could also be expressed like this, to avoid some `mut` (assuming `contains_correlated_columns` has been changed to `bool`):
   
   ```suggestion
       let visitor = expr.accept(CorrelatedColumnsVisitor {
           outer_schema,
           contains_correlated_columns: false,
       })?;
       Ok(visitor.contains_correlated_columns)
   ```



##########
datafusion/core/src/optimizer/utils.rs:
##########
@@ -556,6 +556,32 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
     }
 }
 
+// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
+/// its predicate with all `predicates` ANDed.
+pub fn filter_by_all(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
+    if let Some(predicate) = combine_conjunctive(predicates) {

Review Comment:
   👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +46,173 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn rewrite_correlated_subquery_as_join(
+        &self,
+        outer_plan: LogicalPlan,
+        subquery_expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match subquery_expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_join_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.
+                    LogicalPlan::Projection(Projection { input, expr, .. }) => {
+                        if expr.len() != 1 {
+                            return Err(DataFusionError::Plan(
+                                "Only single column allowed in InSubquery".to_string(),
+                            ));
+                        };
+                        match (&expr[0], &**input) {
+                            (
+                                Expr::Column(right_key),
+                                LogicalPlan::Filter(Filter { predicate, input }),
+                            ) => {
+                                // Extract correlated columns as join columns from the filter predicate
+                                let non_correlated_predicate =
+                                    utils::extract_correlated_as_join_columns(
+                                        predicate,
+                                        outer_plan.schema(),
+                                        &mut correlated_join_columns,
+                                    );
+
+                                // Strip the projection away and use its input for the semi/anti-join

Review Comment:
   Can you explain why this stripping is needed? I don't understand it



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -64,105 +233,41 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 utils::split_conjunction(predicate, &mut filters);
 
                 // Searching for subquery-based filters
-                let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
-                    filters
-                        .into_iter()
-                        .partition(|&e| matches!(e, Expr::InSubquery { .. }));
-
-                // Check all subquery filters could be rewritten
-                //
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let mut subqueries_in_regular = vec![];
-                regular_filters.iter().try_for_each(|&e| {
-                    extract_subquery_filters(e, &mut subqueries_in_regular)
-                })?;
-
-                if !subqueries_in_regular.is_empty() {
-                    return Ok(LogicalPlan::Filter(Filter {
-                        predicate: predicate.clone(),
-                        input: Arc::new(optimized_input),
-                    }));
-                };
-
-                // Add subquery joins to new_input
-                // optimized_input value should retain for possible optimization rollback
-                let opt_result = subquery_filters.iter().try_fold(
-                    optimized_input.clone(),
-                    |input, &e| match e {
-                        Expr::InSubquery {
-                            expr,
-                            subquery,
-                            negated,
-                        } => {
-                            let right_input = self.optimize(
-                                &*subquery.subquery,
-                                execution_props
-                            )?;
-                            let right_schema = right_input.schema();
-                            if right_schema.fields().len() != 1 {
-                                return Err(DataFusionError::Plan(
-                                    "Only single column allowed in InSubquery"
-                                        .to_string(),
-                                ));
-                            };
-
-                            let right_key = right_schema.field(0).qualified_column();
-                            let left_key = match *expr.clone() {
-                                Expr::Column(col) => col,
-                                _ => return Err(DataFusionError::NotImplemented(
-                                    "Filtering by expression not implemented for InSubquery"
-                                        .to_string(),
-                                )),
-                            };
-
-                            let join_type = if *negated {
-                                JoinType::Anti
-                            } else {
-                                JoinType::Semi
-                            };
-
-                            let schema = build_join_schema(
-                                optimized_input.schema(),
-                                right_schema,
-                                &join_type,
-                            )?;
-
-                            Ok(LogicalPlan::Join(Join {
-                                left: Arc::new(input),
-                                right: Arc::new(right_input),
-                                on: vec![(left_key, right_key)],
-                                join_type,
-                                join_constraint: JoinConstraint::On,
-                                schema: Arc::new(schema),
-                                null_equals_null: false,
-                            }))
-                        }
-                        _ => Err(DataFusionError::Plan(
-                            "Unknown expression while rewriting subquery to joins"
-                                .to_string(),
-                        )),
-                    }
-                );
-
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let new_input = match opt_result {
-                    Ok(plan) => plan,
-                    Err(_) => {
+                let (subquery_filters, remainder): (Vec<&Expr>, Vec<&Expr>) =
+                    filters.into_iter().partition(|&e| {
+                        matches!(e, Expr::InSubquery { .. } | Expr::Exists { .. })
+                    });
+
+                let remaining_predicate = utils::combine_conjunctive(&remainder);
+
+                if let Some(predicate) = remaining_predicate {
+                    // Since we are unable to simplify the correlated subquery,
+                    // we must do a row scan against the outer plan anyway, so we abort
+                    //
+                    // TODO: complex expressions which are disjunctive with our subquery expressions
+                    // can be rewritten as unions (without deduplication...)?
+                    if utils::contains_joinable_subquery(&predicate)? {
                         return Ok(LogicalPlan::Filter(Filter {
-                            predicate: predicate.clone(),
+                            predicate,
                             input: Arc::new(optimized_input),
-                        }))
+                        }));
                     }
-                };
-
-                // Apply regular filters to join output if some or just return join
-                if regular_filters.is_empty() {
-                    Ok(new_input)
-                } else {
-                    Ok(utils::add_filter(new_input, &regular_filters))
                 }
+
+                // Add subquery joins to optimized_input
+                let new_input = subquery_filters.iter().try_fold(
+                    optimized_input,
+                    |outer_plan, &subquery_expr| {
+                        self.rewrite_correlated_subquery_as_join(

Review Comment:
   👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, &column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   Yes I agree that re-using a `Projection` to calculate expressions is reasonable.
   
   I think there are certain types of OUTER joins where the actual evaluation needs to happen within the join (as the presence of a NULL means something different than the row being filtered)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1207225727

   I believe this PR has been superseded now, so closing. Please feel free to reopen if you plan to keep working on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1
URL: https://github.com/apache/arrow-datafusion/pull/2451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jackwener commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1138230969

   It's a great job!😀❤My work is a little busy.
   I will review it on weekends.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r884142166


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))
+                        .and(col("table_a.b").eq(col("table_b.b"))),

Review Comment:
   I think we need test like `A or (B and c)`?
   
   Because it can't be split .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#issuecomment-1148958974

   Marking as draft as it needs some work before merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org