You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/29 20:09:49 UTC

[GitHub] [arrow-datafusion] avantgardnerio opened a new pull request, #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

avantgardnerio opened a new pull request, #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813

   # Which issue does this PR close?
   
   Closes #160.
   
    # Rationale for this change
   
   In order to evaluate DataFusion as a candidate query engine, users need to be able to run industry standard benchmarks like TPC-H. Query 4 is a good initial candidate, because it is being blocked only by a relatively simple optimization rule to turn `exists` subqueries into `join`s. 
   
   This PR includes the minimum necessary changes to get Query 4 passing, but I believe this is a generalizable approach that will work for the remaining queries in the TPC-H suite being blocked by subquery-related issues.
   
   I wanted to PR early to start the conversation, but I intend to either submit subsequent PRs generalizing this approach, or extend this PR until we have all the TPC-H subquery cases covered.
   
   # What changes are included in this PR?
   An optimization rule for decorelating a narrowly defined set of queries. Those not explicitly covered will remain unaltered.
   
   # Are there any user-facing changes?
   Any single-column join `where exists` correlated subquery should now work.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1182120430

   Closed in favor of #2885 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910488804


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,142 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use hashbrown::HashSet;
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                return match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());
+                        }
+                        optimize_exists(plan, subquery, input)
+                    }
+                    _ => Ok(plan.clone()),
+                };
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_decorrelate"
+    }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id = c.c_id
+fn optimize_exists(
+    plan: &LogicalPlan,
+    subquery: &Subquery,
+    input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+    // Only operate if there is one input
+    let sub_inputs = subquery.subquery.inputs();
+    if sub_inputs.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let sub_input = if let Some(i) = sub_inputs.get(0) {
+        i
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on subqueries that are trying to filter on an expression from an outer query
+    let filter = if let LogicalPlan::Filter(f) = sub_input {
+        f
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on a single binary equality expression (for now)
+    let (left, op, right) =
+        if let Expr::BinaryExpr { left, op, right } = &filter.predicate {
+            (left, op, right)
+        } else {
+            return Ok(plan.clone());
+        };
+    match op {
+        Operator::Eq => {}
+        _ => return Ok(plan.clone()),
+    }
+
+    // collect list of columns
+    let lcol = match &**left {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let rcol = match &**right {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let cols = vec![lcol, rcol];
+    let cols: HashSet<_> = cols.iter().map(|c| &c.name).collect();
+    let fields: HashSet<_> = sub_input
+        .schema()
+        .fields()
+        .iter()
+        .map(|f| f.name())
+        .collect();
+
+    // Only operate if one column is present and the other closed upon from outside scope
+    let found: Vec<_> = cols.intersection(&fields).map(|it| (*it).clone()).collect();
+    let closed_upon: Vec<_> = cols.difference(&fields).map(|it| (*it).clone()).collect();
+    if found.len() != 1 || closed_upon.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let found = if let Some(it) = found.get(0) {
+        it
+    } else {
+        return Ok(plan.clone());
+    };
+    let closed_upon = if let Some(it) = closed_upon.get(0) {
+        it
+    } else {
+        return Ok(plan.clone());
+    };
+
+    let c_col = vec![Column::from_qualified_name(closed_upon)];
+    let f_col = vec![Column::from_qualified_name(found)];
+    let expr = vec![Expr::Column(found.as_str().into())];
+    let group_expr = vec![Expr::Column(found.as_str().into())];
+    let aggr_expr: Vec<Expr> = vec![];
+    let join_keys = (c_col.clone(), f_col.clone());
+    let right = LogicalPlanBuilder::from((*filter.input).clone())
+        .aggregate(group_expr, aggr_expr)?

Review Comment:
   You could just use `distinct` rather than create the aggregate. It is semantically equivalent and result in a simpler logical plan. It will get translated into an aggregate in the physical plan.
   
   ```suggestion
       let join_keys = (c_col.clone(), f_col.clone());
       let right = LogicalPlanBuilder::from((*filter.input).clone())
           .distinct()?
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910508170


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,168 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use hashbrown::HashSet;
+use itertools::{Either, Itertools};
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());
+                        }
+                        optimize_exists(plan, subquery, input)
+                    }
+                    _ => Ok(plan.clone()),
+                }
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_decorrelate"
+    }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id = c.c_id
+fn optimize_exists(
+    plan: &LogicalPlan,
+    subquery: &Subquery,
+    input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+    // Only operate if there is one input
+    let sub_inputs = subquery.subquery.inputs();
+    if sub_inputs.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let sub_input = if let Some(i) = sub_inputs.get(0) {
+        i
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on subqueries that are trying to filter on an expression from an outer query
+    let filter = if let LogicalPlan::Filter(f) = sub_input {
+        f
+    } else {
+        return Ok(plan.clone());
+    };

Review Comment:
   I think it might be more idiomatic to use a `match` for these patterns.
   
   ```suggestion
       let filter = match sub_input {
           LogicalPlan::Filter(f) => f,
           _ => return Ok(plan.clone())
       };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1170614728

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2813](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (858b284) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/839a61896f0fc3f617a79da1dd4018b2aa6af283?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (839a618) will **decrease** coverage by `0.01%`.
   > The diff coverage is `72.68%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #2813      +/-   ##
   ==========================================
   - Coverage   85.20%   85.19%   -0.02%     
   ==========================================
     Files         274      276       +2     
     Lines       48666    48848     +182     
   ==========================================
   + Hits        41468    41616     +148     
   - Misses       7198     7232      +34     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/common/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb21tb24vc3JjL3NjYWxhci5ycw==) | `74.94% <ø> (+0.11%)` | :arrow_up: |
   | [datafusion/core/tests/sql/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL3NxbC9tb2QucnM=) | `93.25% <0.00%> (-4.39%)` | :arrow_down: |
   | [...tafusion/physical-expr/src/expressions/datetime.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9leHByZXNzaW9ucy9kYXRldGltZS5ycw==) | `59.21% <64.40%> (+26.55%)` | :arrow_up: |
   | [datafusion/optimizer/src/subquery\_decorrelate.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9vcHRpbWl6ZXIvc3JjL3N1YnF1ZXJ5X2RlY29ycmVsYXRlLnJz) | `82.85% <82.85%> (ø)` | |
   | [datafusion/core/tests/sql/subqueries.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL3NxbC9zdWJxdWVyaWVzLnJz) | `88.23% <88.23%> (ø)` | |
   | [datafusion/core/src/execution/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `79.02% <100.00%> (+0.02%)` | :arrow_up: |
   | [datafusion/core/tests/sql/timestamp.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL3NxbC90aW1lc3RhbXAucnM=) | `100.00% <100.00%> (ø)` | |
   | [datafusion/optimizer/src/simplify\_expressions.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9vcHRpbWl6ZXIvc3JjL3NpbXBsaWZ5X2V4cHJlc3Npb25zLnJz) | `82.04% <100.00%> (+0.02%)` | :arrow_up: |
   | [datafusion/core/src/physical\_plan/metrics/value.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL21ldHJpY3MvdmFsdWUucnM=) | `86.93% <0.00%> (-0.51%)` | :arrow_down: |
   | ... and [3 more](https://codecov.io/gh/apache/arrow-datafusion/pull/2813/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [839a618...858b284](https://codecov.io/gh/apache/arrow-datafusion/pull/2813?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910374325


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,138 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use hashbrown::HashSet;
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                return match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());

Review Comment:
   In any case of doubt, fall back to skipping optimization, following the "do no harm" rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910372690


##########
datafusion/common/src/scalar.rs:
##########
@@ -37,6 +37,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
 
 /// Represents a dynamically typed, nullable single value.
 /// This is the single-valued counter-part of arrow’s `Array`.
+/// https://arrow.apache.org/docs/python/api/datatypes.html
+/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375

Review Comment:
   Sorry, this was built upon #2797 . I'll turn this into a draft until that gets merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910377177


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,138 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use hashbrown::HashSet;
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                return match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());
+                        }
+                        optimize_exists(plan, subquery, input)
+                    }
+                    _ => Ok(plan.clone()),
+                };
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_decorrelate"
+    }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id = c.c_id
+fn optimize_exists(
+    plan: &LogicalPlan,
+    subquery: &Subquery,
+    input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+    // Only operate if there is one input
+    let sub_inputs = subquery.subquery.inputs();
+    if sub_inputs.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let sub_input = if let Some(i) = sub_inputs.get(0) {
+        i
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on subqueries that are trying to filter on an expression from an outer query
+    let filter = if let LogicalPlan::Filter(f) = sub_input {
+        f
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on a single binary expression (for now)
+    let (left, _op, right) =
+        if let Expr::BinaryExpr { left, op, right } = &filter.predicate {
+            (left, op, right)
+        } else {
+            return Ok(plan.clone());
+        };
+
+    // collect list of columns
+    let lcol = match &**left {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let rcol = match &**right {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let cols = vec![lcol, rcol];
+    let cols: HashSet<_> = cols.iter().map(|c| &c.name).collect();
+    let fields: HashSet<_> = sub_input
+        .schema()
+        .fields()
+        .iter()
+        .map(|f| f.name())
+        .collect();
+
+    // Only operate if one column is present and the other closed upon from outside scope
+    let found: Vec<_> = cols.intersection(&fields).map(|it| (*it).clone()).collect();
+    let closed_upon: Vec<_> = cols.difference(&fields).map(|it| (*it).clone()).collect();

Review Comment:
   We should truly resolve closed-upon scope here, instead of assuming if it's not in the present scope it must exist elsewhere. Queries will fail either way, but this could cause the error messages to be significantly more difficult for users to debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio closed pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio closed pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`
URL: https://github.com/apache/arrow-datafusion/pull/2813


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1171624690

   I double checked with ```cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 4 --batch-size 4096``` and observed that this didn't resolve "the real" query 4, just my stripped down minimal failing test case.
   
   I made some adjustments, and I was able to run query 4 with the presently committed code:
   
   ```
   +-----------------+-------------+
   | o_orderpriority | order_count |
   +-----------------+-------------+
   | 1-URGENT        | 10594       |
   | 2-HIGH          | 10476       |
   | 3-MEDIUM        | 10410       |
   | 4-NOT SPECIFIED | 10556       |
   | 5-LOW           | 10487       |
   +-----------------+-------------+
   Query 4 iteration 2 took 43617.9 ms and returned 5 rows
   Query 4 avg time: 45785.94 ms
   ```
   
   This is slow, but matches my postgres results:
   ```
   +---------------+-----------+
   |o_orderpriority|order_count|
   +---------------+-----------+
   |1-URGENT       |10594      |
   |2-HIGH         |10476      |
   |3-MEDIUM       |10410      |
   |4-NOT SPECIFIED|10556      |
   |5-LOW          |10487      |
   +---------------+-----------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1171347173

   Thanks @avantgardnerio. This looks good overall and the logic is easy to follow. I will review again when https://github.com/apache/arrow-datafusion/pull/2797 is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1172925124

   Probably duplicated work with #2421 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910501054


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,142 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use hashbrown::HashSet;
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                return match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());
+                        }
+                        optimize_exists(plan, subquery, input)
+                    }
+                    _ => Ok(plan.clone()),
+                };
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_decorrelate"
+    }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id = c.c_id
+fn optimize_exists(
+    plan: &LogicalPlan,
+    subquery: &Subquery,
+    input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+    // Only operate if there is one input
+    let sub_inputs = subquery.subquery.inputs();
+    if sub_inputs.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let sub_input = if let Some(i) = sub_inputs.get(0) {
+        i
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on subqueries that are trying to filter on an expression from an outer query
+    let filter = if let LogicalPlan::Filter(f) = sub_input {
+        f
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on a single binary equality expression (for now)
+    let (left, op, right) =
+        if let Expr::BinaryExpr { left, op, right } = &filter.predicate {
+            (left, op, right)
+        } else {
+            return Ok(plan.clone());
+        };
+    match op {
+        Operator::Eq => {}
+        _ => return Ok(plan.clone()),
+    }
+
+    // collect list of columns
+    let lcol = match &**left {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let rcol = match &**right {
+        Expr::Column(col) => col,
+        _ => return Ok(plan.clone()),
+    };
+    let cols = vec![lcol, rcol];
+    let cols: HashSet<_> = cols.iter().map(|c| &c.name).collect();
+    let fields: HashSet<_> = sub_input
+        .schema()
+        .fields()
+        .iter()
+        .map(|f| f.name())
+        .collect();
+
+    // Only operate if one column is present and the other closed upon from outside scope
+    let found: Vec<_> = cols.intersection(&fields).map(|it| (*it).clone()).collect();
+    let closed_upon: Vec<_> = cols.difference(&fields).map(|it| (*it).clone()).collect();
+    if found.len() != 1 || closed_upon.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let found = if let Some(it) = found.get(0) {
+        it
+    } else {
+        return Ok(plan.clone());
+    };
+    let closed_upon = if let Some(it) = closed_upon.get(0) {
+        it
+    } else {
+        return Ok(plan.clone());
+    };
+
+    let c_col = vec![Column::from_qualified_name(closed_upon)];
+    let f_col = vec![Column::from_qualified_name(found)];
+    let expr = vec![Expr::Column(found.as_str().into())];
+    let group_expr = vec![Expr::Column(found.as_str().into())];
+    let aggr_expr: Vec<Expr> = vec![];
+    let join_keys = (c_col.clone(), f_col.clone());
+    let right = LogicalPlanBuilder::from((*filter.input).clone())
+        .aggregate(group_expr, aggr_expr)?

Review Comment:
   Actually, I may have misunderstood. I thought this was grouping on all the columns but it looks that is not the case so please disregard this suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r910372934


##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -483,7 +484,37 @@ fn get_tpch_table_schema(table: &str) -> Schema {
             Field::new("n_comment", DataType::Utf8, false),
         ]),
 
-        _ => unimplemented!(),
+        "supplier" => Schema::new(vec![

Review Comment:
   Add missing TPC-H tables to support testing those queries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r911146230


##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,168 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
+use hashbrown::HashSet;
+use itertools::{Either, Itertools};
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> datafusion_common::Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
+                match predicate {
+                    // TODO: arbitrary expressions
+                    Expr::Exists { subquery, negated } => {
+                        if *negated {
+                            return Ok(plan.clone());
+                        }
+                        optimize_exists(plan, subquery, input)
+                    }
+                    _ => Ok(plan.clone()),
+                }
+            }
+            _ => {
+                // Apply the optimization to all inputs of the plan
+                utils::optimize_children(self, plan, optimizer_config)
+            }
+        }
+    }
+
+    fn name(&self) -> &str {
+        "subquery_decorrelate"
+    }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id = c.c_id
+fn optimize_exists(
+    plan: &LogicalPlan,
+    subquery: &Subquery,
+    input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+    // Only operate if there is one input
+    let sub_inputs = subquery.subquery.inputs();
+    if sub_inputs.len() != 1 {
+        return Ok(plan.clone());
+    }
+    let sub_input = if let Some(i) = sub_inputs.get(0) {
+        i
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // Only operate on subqueries that are trying to filter on an expression from an outer query
+    let filter = if let LogicalPlan::Filter(f) = sub_input {
+        f
+    } else {
+        return Ok(plan.clone());
+    };
+
+    // split into filters
+    let mut filters = vec![];
+    utils::split_conjunction(&filter.predicate, &mut filters);
+
+    // get names of fields TODO: Must fully qualify these!
+    let fields: HashSet<_> = sub_input
+        .schema()
+        .fields()
+        .iter()
+        .map(|f| f.name())
+        .collect();

Review Comment:
   You should be able to get a hashset of qualified names like this:
   
   ```suggestion
       let fields = HashSet::from_iter(sub_input
           .schema()
           .field_names());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] avantgardnerio commented on pull request #2813: Optimize `where exists` sub-queries into `aggregate` and `join`

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#issuecomment-1171648580

   The remaining failing queries seem to fall into two categories:
   
   1. Ones that fail because we only handle `where exists` with this PR (not `where x < (subquery)` expressions)
   2. Ones that fail due to multiple subqueries in the same filter expression (which means we have to run this iteratively or something)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org