You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/09 16:10:34 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2451: Add `EXISTS` and `IN` subquery rewriting for correlated filters at filter depth 1

alamb commented on code in PR #2451:
URL: https://github.com/apache/arrow-datafusion/pull/2451#discussion_r868056509


##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))
+                        .and(col("table_a.b").eq(col("table_b.b"))),
+                ),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.c = #table_b.c, #table_a.a = #table_b.a, #table_a.b = #table_b.b [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_with_non_correlated_filter() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.a").eq(col("table_b.a")))
+                    .and(col("table_b.b").gt(lit("5"))),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .project(vec![col("a"), col("b")])?
+            .filter(exists(Arc::new(subquery)))?
+            .build()?;
+        let expected = "\
+            Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32]\
+            \n  Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n  Filter: #table_b.b > Utf8(\"5\") [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    // We only test not exists for the simplest case since all the other code paths

Review Comment:
   👍 
   
   I think there might be some subtleties involving NULLs (like if the subquery only has nulls and there is a non equality filter like `table_b.a > 5`



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -348,11 +556,36 @@ mod tests {
         let expected = "Projection: #test.b [b:UInt32]\
         \n  Semi Join: #test.b = #sq.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: #sq.a [a:UInt32]\
-        \n      Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: #sq_nested.c [c:UInt32]\
-        \n          TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
+        \n    Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   > The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:
   
   This sounds like a subset of the filter "pushdown" rule -- so maybe we can just invoke that again rather than adding a new special case rule



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))
+                        .and(col("table_a.b").eq(col("table_b.b"))),
+                ),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.c = #table_b.c, #table_a.a = #table_b.a, #table_a.b = #table_b.b [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_with_non_correlated_filter() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.a").eq(col("table_b.a")))
+                    .and(col("table_b.b").gt(lit("5"))),
+            )?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .project(vec![col("a"), col("b")])?
+            .filter(exists(Arc::new(subquery)))?
+            .build()?;
+        let expected = "\
+            Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32]\
+            \n  Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n  Filter: #table_b.b > Utf8(\"5\") [a:UInt32, b:UInt32, c:UInt32]\

Review Comment:
   I vaguely remember some corner cases related to these non equality predicates and SEMI joins, but when I tried to come up with some examples of issues with this plan I could not. 👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -386,4 +504,112 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    #[test]
+    fn test_exists_simple() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(col("table_a.a").eq(col("table_b.a")))?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_a)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+
+        let expected = "\
+            Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
+            \n  Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
+            \n    TableScan: table_b projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_exists_multiple_correlated_filters() -> Result<()> {
+        let table_a = test_table_scan_with_name("table_a")?;
+        let table_b = test_table_scan_with_name("table_b")?;
+
+        // Test AND and nested filters will be extracted as join columns
+        let subquery = LogicalPlanBuilder::from(table_b)
+            .filter(
+                (col("table_a.c").eq(col("table_b.c"))).and(
+                    (col("table_a.a").eq(col("table_b.a")))

Review Comment:
   I suggest at least one predicate that has the subquery / table_b on the left -- like `table_b.a = table_a.` as all these predicates have `table_a` on the left



##########
datafusion/core/src/optimizer/utils.rs:
##########
@@ -574,23 +600,180 @@ pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>
     }
 }
 
-/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
-/// its predicate be all `predicates` ANDed.
-pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
-    // reduce filters to a single filter with an AND
-    let predicate = predicates
-        .iter()
-        .skip(1)
-        .fold(predicates[0].clone(), |acc, predicate| {
-            and(acc, (*predicate).to_owned())
-        });
-
-    LogicalPlan::Filter(Filter {
-        predicate,
-        input: Arc::new(plan),
-    })
+/// converts "A OR B OR C" => [A, B, C]
+pub fn split_disjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
+    match predicate {
+        Expr::BinaryExpr {
+            right,
+            op: Operator::Or,
+            left,
+        } => {
+            split_disjunction(left, predicates);
+            split_disjunction(right, predicates);
+        }
+        Expr::Alias(expr, _) => {
+            split_disjunction(expr, predicates);
+        }
+        other => predicates.push(other),
+    }
+}
+
+/// Converts [A, B, C] -> A AND B AND C
+pub fn combine_conjunctive(predicates: &[&Expr]) -> Option<Expr> {
+    if predicates.is_empty() {
+        None
+    } else {
+        // reduce filters to a single filter with an AND
+        Some(
+            predicates
+                .iter()
+                .skip(1)
+                .fold(predicates[0].clone(), |acc, predicate| {
+                    and(acc, (*predicate).to_owned())
+                }),
+        )
+    }
+}
+
+/// Converts [A, B, C] -> A OR B OR C
+pub fn combine_disjunctive(predicates: &[&Expr]) -> Option<Expr> {
+    if predicates.is_empty() {
+        None
+    } else {
+        // reduce filters to a single filter with an OR
+        Some(
+            predicates
+                .iter()
+                .skip(1)
+                .fold(predicates[0].clone(), |acc, predicate| {
+                    or(acc, (*predicate).to_owned())
+                }),
+        )
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+struct SubqueryVisitor<'a> {
+    contains_joinable_subquery: &'a mut bool,
+}
+
+impl ExpressionVisitor for SubqueryVisitor<'_> {
+    fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
+        match expr {
+            Expr::InSubquery { .. } | Expr::Exists { .. } => {
+                *self.contains_joinable_subquery = true;
+                return Ok(Recursion::Stop(self));
+            }
+            _ => {}
+        }
+        Ok(Recursion::Continue(self))
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+pub fn contains_joinable_subquery(expr: &Expr) -> Result<bool> {
+    let mut contains_joinable_subquery = false;
+    expr.accept(SubqueryVisitor {
+        contains_joinable_subquery: &mut contains_joinable_subquery,
+    })?;
+    Ok(contains_joinable_subquery)
+}
+
+/// Checks if the column belongs to the outer schema
+pub(crate) fn column_is_correlated(outer: &Arc<DFSchema>, column: &Column) -> bool {
+    for field in outer.fields() {
+        if *column == field.qualified_column() || *column == field.unqualified_column() {
+            return true;
+        }
+    }
+    false
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a joinable subquery
+struct CorrelatedColumnsVisitor<'a> {
+    outer_schema: &'a Arc<DFSchema>,
+    contains_correlated_columns: &'a mut bool,
+}
+
+impl ExpressionVisitor for CorrelatedColumnsVisitor<'_> {
+    fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
+        if let Expr::Column(c) = expr {
+            if column_is_correlated(self.outer_schema, c) {
+                *self.contains_correlated_columns = true;
+                return Ok(Recursion::Stop(self));
+            }
+        }
+        Ok(Recursion::Continue(self))
+    }
+}
+
+/// Recursively walk an expression tree, returning true if it encounters a correlated column
+pub fn contains_correlated_columns(
+    outer_schema: &Arc<DFSchema>,
+    expr: &Expr,
+) -> Result<bool> {
+    let mut contains_correlated_columns = false;
+    expr.accept(CorrelatedColumnsVisitor {
+        outer_schema,
+        contains_correlated_columns: &mut contains_correlated_columns,
+    })?;
+    Ok(contains_correlated_columns)

Review Comment:
   I don't think it really matters, but this type of pattern could also be expressed like this, to avoid some `mut` (assuming `contains_correlated_columns` has been changed to `bool`):
   
   ```suggestion
       let visitor = expr.accept(CorrelatedColumnsVisitor {
           outer_schema,
           contains_correlated_columns: false,
       })?;
       Ok(visitor.contains_correlated_columns)
   ```



##########
datafusion/core/src/optimizer/utils.rs:
##########
@@ -556,6 +556,32 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
     }
 }
 
+// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
+/// its predicate with all `predicates` ANDed.
+pub fn filter_by_all(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
+    if let Some(predicate) = combine_conjunctive(predicates) {

Review Comment:
   👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +46,173 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn rewrite_correlated_subquery_as_join(
+        &self,
+        outer_plan: LogicalPlan,
+        subquery_expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match subquery_expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_join_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.
+                    LogicalPlan::Projection(Projection { input, expr, .. }) => {
+                        if expr.len() != 1 {
+                            return Err(DataFusionError::Plan(
+                                "Only single column allowed in InSubquery".to_string(),
+                            ));
+                        };
+                        match (&expr[0], &**input) {
+                            (
+                                Expr::Column(right_key),
+                                LogicalPlan::Filter(Filter { predicate, input }),
+                            ) => {
+                                // Extract correlated columns as join columns from the filter predicate
+                                let non_correlated_predicate =
+                                    utils::extract_correlated_as_join_columns(
+                                        predicate,
+                                        outer_plan.schema(),
+                                        &mut correlated_join_columns,
+                                    );
+
+                                // Strip the projection away and use its input for the semi/anti-join

Review Comment:
   Can you explain why this stripping is needed? I don't understand it



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -64,105 +233,41 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 utils::split_conjunction(predicate, &mut filters);
 
                 // Searching for subquery-based filters
-                let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
-                    filters
-                        .into_iter()
-                        .partition(|&e| matches!(e, Expr::InSubquery { .. }));
-
-                // Check all subquery filters could be rewritten
-                //
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let mut subqueries_in_regular = vec![];
-                regular_filters.iter().try_for_each(|&e| {
-                    extract_subquery_filters(e, &mut subqueries_in_regular)
-                })?;
-
-                if !subqueries_in_regular.is_empty() {
-                    return Ok(LogicalPlan::Filter(Filter {
-                        predicate: predicate.clone(),
-                        input: Arc::new(optimized_input),
-                    }));
-                };
-
-                // Add subquery joins to new_input
-                // optimized_input value should retain for possible optimization rollback
-                let opt_result = subquery_filters.iter().try_fold(
-                    optimized_input.clone(),
-                    |input, &e| match e {
-                        Expr::InSubquery {
-                            expr,
-                            subquery,
-                            negated,
-                        } => {
-                            let right_input = self.optimize(
-                                &*subquery.subquery,
-                                execution_props
-                            )?;
-                            let right_schema = right_input.schema();
-                            if right_schema.fields().len() != 1 {
-                                return Err(DataFusionError::Plan(
-                                    "Only single column allowed in InSubquery"
-                                        .to_string(),
-                                ));
-                            };
-
-                            let right_key = right_schema.field(0).qualified_column();
-                            let left_key = match *expr.clone() {
-                                Expr::Column(col) => col,
-                                _ => return Err(DataFusionError::NotImplemented(
-                                    "Filtering by expression not implemented for InSubquery"
-                                        .to_string(),
-                                )),
-                            };
-
-                            let join_type = if *negated {
-                                JoinType::Anti
-                            } else {
-                                JoinType::Semi
-                            };
-
-                            let schema = build_join_schema(
-                                optimized_input.schema(),
-                                right_schema,
-                                &join_type,
-                            )?;
-
-                            Ok(LogicalPlan::Join(Join {
-                                left: Arc::new(input),
-                                right: Arc::new(right_input),
-                                on: vec![(left_key, right_key)],
-                                join_type,
-                                join_constraint: JoinConstraint::On,
-                                schema: Arc::new(schema),
-                                null_equals_null: false,
-                            }))
-                        }
-                        _ => Err(DataFusionError::Plan(
-                            "Unknown expression while rewriting subquery to joins"
-                                .to_string(),
-                        )),
-                    }
-                );
-
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let new_input = match opt_result {
-                    Ok(plan) => plan,
-                    Err(_) => {
+                let (subquery_filters, remainder): (Vec<&Expr>, Vec<&Expr>) =
+                    filters.into_iter().partition(|&e| {
+                        matches!(e, Expr::InSubquery { .. } | Expr::Exists { .. })
+                    });
+
+                let remaining_predicate = utils::combine_conjunctive(&remainder);
+
+                if let Some(predicate) = remaining_predicate {
+                    // Since we are unable to simplify the correlated subquery,
+                    // we must do a row scan against the outer plan anyway, so we abort
+                    //
+                    // TODO: complex expressions which are disjunctive with our subquery expressions
+                    // can be rewritten as unions (without deduplication...)?
+                    if utils::contains_joinable_subquery(&predicate)? {
                         return Ok(LogicalPlan::Filter(Filter {
-                            predicate: predicate.clone(),
+                            predicate,
                             input: Arc::new(optimized_input),
-                        }))
+                        }));
                     }
-                };
-
-                // Apply regular filters to join output if some or just return join
-                if regular_filters.is_empty() {
-                    Ok(new_input)
-                } else {
-                    Ok(utils::add_filter(new_input, &regular_filters))
                 }
+
+                // Add subquery joins to optimized_input
+                let new_input = subquery_filters.iter().try_fold(
+                    optimized_input,
+                    |outer_plan, &subquery_expr| {
+                        self.rewrite_correlated_subquery_as_join(

Review Comment:
   👍 



##########
datafusion/core/src/optimizer/subquery_filter_to_join.rs:
##########
@@ -46,6 +47,227 @@ impl SubqueryFilterToJoin {
     pub fn new() -> Self {
         Self {}
     }
+
+    fn are_correlated_columns(
+        &self,
+        outer: &Arc<DFSchema>,
+        column_a: &Column,
+        column_b: &Column,
+    ) -> Option<(Column, Column)> {
+        if column_is_correlated(outer, column_a) {
+            return Some((column_a.clone(), column_b.clone()));
+        } else if column_is_correlated(outer, column_b) {
+            return Some((column_b.clone(), column_a.clone()));
+        }
+        None
+    }
+
+    // TODO: do we need to check correlation/dependency only with outer input top-level schema?
+    // NOTE: We only match against an equality filter with an outer column
+    fn extract_correlated_columns(
+        &self,
+        expr: &Expr,
+        outer: &Arc<DFSchema>,
+        correlated_columns: &mut Vec<(Column, Column)>,
+    ) -> Option<Expr> {
+        let mut filters = vec![];
+        // This will also strip aliases
+        utils::split_conjunction(expr, &mut filters);
+
+        let mut non_correlated_predicates = vec![];
+        for filter in filters {
+            match filter {
+                Expr::BinaryExpr { left, op, right } => {
+                    let mut extracted_column = false;
+                    if let (Expr::Column(column_a), Expr::Column(column_b)) =
+                        (left.as_ref(), right.as_ref())
+                    {
+                        if let Some(columns) =
+                            self.are_correlated_columns(outer, &column_a, &column_b)
+                        {
+                            if *op == Operator::Eq {
+                                correlated_columns.push(columns);
+                                extracted_column = true;
+                            }
+                        }
+                    }
+                    if !extracted_column {
+                        non_correlated_predicates.push(filter);
+                    }
+                }
+                _ => non_correlated_predicates.push(filter),
+            }
+        }
+
+        if non_correlated_predicates.is_empty() {
+            None
+        } else {
+            Some(utils::combine_conjunctive(&non_correlated_predicates))
+        }
+    }
+
+    fn rewrite_outer_plan(
+        &self,
+        outer_plan: LogicalPlan,
+        expr: &Expr,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
+        match expr {
+            Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            } => {
+                let mut correlated_columns = vec![];
+                let subquery_ref = &*subquery.subquery;
+                let right_decorrelated_plan = match subquery_ref {
+                    // NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
+                    // which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
+                    // at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
+                    // a single Column.

Review Comment:
   Yes I agree that re-using a `Projection` to calculate expressions is reasonable.
   
   I think there are certain types of OUTER joins where the actual evaluation needs to happen within the join (as the presence of a NULL means something different than the row being filtered)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org