You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jackwener (via GitHub)" <gi...@apache.org> on 2023/05/30 05:31:16 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #6457: Support wider range of Subquery

jackwener commented on code in PR #6457:
URL: https://github.com/apache/arrow-datafusion/pull/6457#discussion_r1209685165


##########
datafusion/optimizer/src/utils.rs:
##########
@@ -409,6 +389,235 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
     trace!("{description}::\n{}\n", plan.display_indent_schema());
 }
 
+/// This struct rewrite the sub query plan by pull up the correlated expressions(contains outer reference columns) from the inner subquery's [Filter].
+/// It adds the inner reference columns to the 'Projection' or 'Aggregate' of the subquery if they are missing, so that they can be evaluated by the parent operator as the join condition.
+pub struct PullUpCorrelatedExpr {

Review Comment:
   ๐Ÿš€Great job.



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {

Review Comment:
   Add comment to explain what it does



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(filter) => filter.input.head_output_expr(),
+            LogicalPlan::Distinct(distinct) => distinct.input.head_output_expr(),
+            LogicalPlan::Sort(sort) => sort.input.head_output_expr(),
+            LogicalPlan::Limit(limit) => limit.input.head_output_expr(),
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                join_type,
+                ..
+            }) => match join_type {
+                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
+                    if left.schema().fields().is_empty() {
+                        right.head_output_expr()
+                    } else {
+                        left.head_output_expr()
+                    }
+                }
+                JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
+                JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
+            },
+            LogicalPlan::CrossJoin(cross) => {
+                if cross.left.schema().fields().is_empty() {
+                    cross.right.head_output_expr()
+                } else {
+                    cross.left.head_output_expr()
+                }
+            }
+            LogicalPlan::Repartition(repartition) => repartition.input.head_output_expr(),
+            LogicalPlan::Window(window) => window.input.head_output_expr(),

Review Comment:
   ```suggestion
   ```



##########
datafusion/core/tests/tpcds_planning.rs:
##########
@@ -557,7 +557,6 @@ async fn tpcds_physical_q5() -> Result<()> {
     create_physical_plan(5).await
 }
 
-#[ignore] // Physical plan does not support logical expression (<subquery>)

Review Comment:
   ๐ŸŽ‰



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2030,7 +2030,7 @@ async fn subquery_to_join_with_both_side_expr() -> Result<()> {
         "  LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "    SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
-        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "      Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS t2.t2_id + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",

Review Comment:
   I don't figure out why occur double alias



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -402,6 +403,78 @@ impl LogicalPlan {
         Ok(using_columns)
     }
 
+    pub fn head_output_expr(&self) -> Result<Option<Expr>> {
+        match self {
+            LogicalPlan::Projection(projection) => {
+                Ok(Some(projection.expr.as_slice()[0].clone()))
+            }
+            LogicalPlan::Aggregate(agg) => {
+                if agg.group_expr.is_empty() {
+                    Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
+                } else {
+                    Ok(Some(agg.group_expr.as_slice()[0].clone()))
+                }
+            }
+            LogicalPlan::Filter(filter) => filter.input.head_output_expr(),
+            LogicalPlan::Distinct(distinct) => distinct.input.head_output_expr(),
+            LogicalPlan::Sort(sort) => sort.input.head_output_expr(),
+            LogicalPlan::Limit(limit) => limit.input.head_output_expr(),

Review Comment:
   ```suggestion
               LogicalPlan::Filter(Filter { input, .. })
               | LogicalPlan::Distinct(Distinct { input, .. })
               | LogicalPlan::Sort(Sort { input, .. })
               | LogicalPlan::Limit(Limit { input, .. })
               | LogicalPlan::Repartition(Repartition { input, .. })
               | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org