You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/02/16 19:10:36 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5264: Support non-tuple expression for exists-subquery to join

alamb commented on code in PR #5264:
URL: https://github.com/apache/arrow-datafusion/pull/5264#discussion_r1108896625


##########
datafusion/optimizer/src/utils.rs:
##########
@@ -468,6 +469,42 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
         })
 }
 
+/// Extract join predicates from the correclated subquery.
+/// The join predicate means that the expression references columns
+/// from both the subquery and outer table or only from the outer table.
+///
+/// Returns join predicates and subquery(extracted).
+/// ```
+pub(crate) fn extract_join_filters(
+    maybe_filter: &LogicalPlan,
+) -> Result<(Vec<Expr>, LogicalPlan)> {
+    if let LogicalPlan::Filter(plan_filter) = maybe_filter {
+        let input_schema = plan_filter.input.schema();
+        let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
+
+        let mut join_filters: Vec<Expr> = vec![];
+        let mut subquery_filters: Vec<Expr> = vec![];
+        for expr in subquery_filter_exprs {
+            let cols = expr.to_columns()?;
+            if check_all_column_from_schema(&cols, input_schema.clone()) {
+                subquery_filters.push(expr.clone());

Review Comment:
   I wonder if you need to clone the expr here. It seems like it is owned already it could be used directly here and below
   
   Though I see this code was just refactored into a different module



##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -144,55 +142,68 @@ fn optimize_exists(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
 ) -> Result<Option<LogicalPlan>> {
-    let subqry_filter = match query_info.query.subquery.as_ref() {
+    let maybe_subqury_filter = match query_info.query.subquery.as_ref() {
         LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() {
-            LogicalPlan::Projection(subqry_proj) => {
-                Filter::try_from_plan(&subqry_proj.input)
-            }
+            LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
             _ => {
-                // Subquery currently only supports distinct or projection
                 return Ok(None);
             }
         },
-        LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input),
+        LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
         _ => {
             // Subquery currently only supports distinct or projection
             return Ok(None);
         }
     }
-    .map_err(|e| context!("cannot optimize non-correlated subquery", e))?;
-
-    // split into filters
-    let subqry_filter_exprs = split_conjunction(&subqry_filter.predicate);
-    verify_not_disjunction(&subqry_filter_exprs)?;
-
-    // Grab column names to join on
-    let (col_exprs, other_subqry_exprs) =
-        find_join_exprs(subqry_filter_exprs, subqry_filter.input.schema())?;
-    let (outer_cols, subqry_cols, join_filters) =
-        exprs_to_join_cols(&col_exprs, subqry_filter.input.schema(), false)?;
-    if subqry_cols.is_empty() || outer_cols.is_empty() {
-        // cannot optimize non-correlated subquery
+    .as_ref();
+
+    // extract join filters
+    let (join_filters, subquery_input) = extract_join_filters(maybe_subqury_filter)?;
+    // cannot optimize non-correlated subquery
+    if join_filters.is_empty() {
         return Ok(None);
     }
 
-    // build subquery side of join - the thing the subquery was querying
-    let mut subqry_plan = LogicalPlanBuilder::from(subqry_filter.input.as_ref().clone());
-    if let Some(expr) = conjunction(other_subqry_exprs) {
-        subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
-    }
-    let subqry_plan = subqry_plan.build()?;
+    let input_schema = subquery_input.schema();
+    let subquery_cols: BTreeSet<Column> =
+        join_filters
+            .iter()
+            .try_fold(BTreeSet::new(), |mut cols, expr| {
+                let using_cols: Vec<Column> = expr
+                    .to_columns()?
+                    .into_iter()
+                    .filter(|col| input_schema.field_from_column(col).is_ok())
+                    .collect::<_>();
+
+                cols.extend(using_cols);
+                Result::<_, DataFusionError>::Ok(cols)
+            })?;
+
+    let projection_exprs: Vec<Expr> =
+        subquery_cols.into_iter().map(Expr::Column).collect();
+
+    let right = LogicalPlanBuilder::from(subquery_input)
+        .project(projection_exprs)?
+        .build()?;
 
-    let join_keys = (subqry_cols, outer_cols);
+    let join_filter = conjunction(join_filters);
 
     // join our sub query into the main plan
     let join_type = match query_info.negated {
         true => JoinType::LeftAnti,
         false => JoinType::LeftSemi,
     };
+
+    // TODO: add Distinct if the original plan is a Distinct.

Review Comment:
   is this still a todo? I see the code above looking into Distinct children, but the distinct is not added back



##########
datafusion/optimizer/tests/integration-test.rs:
##########
@@ -121,8 +121,9 @@ fn semi_join_with_join_filter() -> Result<()> {
     let expected = "Projection: test.col_utf8\
                     \n  LeftSemi Join: test.col_int32 = t2.col_int32 Filter: test.col_uint32 != t2.col_uint32\
                     \n    TableScan: test projection=[col_int32, col_uint32, col_utf8]\
-                    \n    SubqueryAlias: t2\
-                    \n      TableScan: test projection=[col_int32, col_uint32, col_utf8]";
+                    \n    Projection: t2.col_int32, t2.col_uint32\
+                    \n      SubqueryAlias: t2\
+                    \n        TableScan: test projection=[col_int32, col_uint32]";

Review Comment:
   this is a nice improvement too -- the unused column `col_utf8` is filtered out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org