You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 18:33:41 UTC

[arrow-datafusion] branch master updated: Support `SubqueryAlias` in optimizer-executor. (#4293)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 07f65bff3 Support `SubqueryAlias` in optimizer-executor. (#4293)
07f65bff3 is described below

commit 07f65bff330677cb273cb2edb6cdfe958e69ce31
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Nov 24 02:33:34 2022 +0800

    Support `SubqueryAlias` in optimizer-executor. (#4293)
---
 datafusion/core/src/physical_plan/planner.rs     |   7 +-
 datafusion/optimizer/src/projection_push_down.rs | 201 +++++++++++------------
 2 files changed, 98 insertions(+), 110 deletions(-)

diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 7345f44d6..691c935b8 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -969,12 +969,7 @@ impl DefaultPhysicalPlanner {
                     SchemaRef::new(schema.as_ref().to_owned().into()),
                 ))),
                 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
-                    match input.as_ref() {
-                        LogicalPlan::TableScan(..) => {
-                            self.create_initial_plan(input, session_state).await
-                        }
-                        _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
-                    }
+                    self.create_initial_plan(input, session_state).await
                 }
                 LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
                     let input = self.create_initial_plan(input, session_state).await?;
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index 5a44247ea..916071bd5 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -19,7 +19,7 @@
 //! loaded into memory
 
 use crate::{OptimizerConfig, OptimizerRule};
-use arrow::datatypes::{Field, Schema};
+use arrow::datatypes::Field;
 use arrow::error::Result as ArrowResult;
 use datafusion_common::{
     Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema,
@@ -34,6 +34,7 @@ use datafusion_expr::{
     utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan},
     Expr,
 };
+use std::collections::HashMap;
 use std::{
     collections::{BTreeSet, HashSet},
     sync::Arc,
@@ -72,60 +73,6 @@ impl ProjectionPushDown {
     }
 }
 
-fn get_projected_schema(
-    table_name: Option<&String>,
-    schema: &Schema,
-    required_columns: &HashSet<Column>,
-    has_projection: bool,
-) -> Result<(Vec<usize>, DFSchemaRef)> {
-    // once we reach the table scan, we can use the accumulated set of column
-    // names to construct the set of column indexes in the scan
-    //
-    // we discard non-existing columns because some column names are not part of the schema,
-    // e.g. when the column derives from an aggregation
-    //
-    // Use BTreeSet to remove potential duplicates (e.g. union) as
-    // well as to sort the projection to ensure deterministic behavior
-    let mut projection: BTreeSet<usize> = required_columns
-        .iter()
-        .filter(|c| c.relation.is_none() || c.relation.as_ref() == table_name)
-        .map(|c| schema.index_of(&c.name))
-        .filter_map(ArrowResult::ok)
-        .collect();
-
-    if projection.is_empty() {
-        if has_projection && !schema.fields().is_empty() {
-            // Ensure that we are reading at least one column from the table in case the query
-            // does not reference any columns directly such as "SELECT COUNT(1) FROM table",
-            // except when the table is empty (no column)
-            projection.insert(0);
-        } else {
-            // for table scan without projection, we default to return all columns
-            projection = schema
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect::<BTreeSet<usize>>();
-        }
-    }
-
-    // create the projected schema
-    let projected_fields: Vec<DFField> = match table_name {
-        Some(qualifier) => projection
-            .iter()
-            .map(|i| DFField::from_qualified(qualifier, schema.fields()[*i].clone()))
-            .collect(),
-        None => projection
-            .iter()
-            .map(|i| DFField::from(schema.fields()[*i].clone()))
-            .collect(),
-    };
-
-    let projection = projection.into_iter().collect::<Vec<_>>();
-    Ok((projection, projected_fields.to_dfschema_ref()?))
-}
-
 /// Recursively transverses the logical plan removing expressions and that are not needed.
 fn optimize_plan(
     _optimizer: &ProjectionPushDown,
@@ -348,28 +295,8 @@ fn optimize_plan(
         }
         // scans:
         // * remove un-used columns from the scan projection
-        LogicalPlan::TableScan(TableScan {
-            table_name,
-            source,
-            filters,
-            fetch: limit,
-            ..
-        }) => {
-            let (projection, projected_schema) = get_projected_schema(
-                Some(table_name),
-                &source.schema(),
-                required_columns,
-                has_projection,
-            )?;
-            // return the table scan with projection
-            Ok(LogicalPlan::TableScan(TableScan {
-                table_name: table_name.clone(),
-                source: source.clone(),
-                projection: Some(projection),
-                projected_schema,
-                filters: filters.clone(),
-                fetch: *limit,
-            }))
+        LogicalPlan::TableScan(scan) => {
+            push_down_scan(scan, &new_required_columns, has_projection)
         }
         LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
             "Unsupported logical plan: Explain must be root of the plan".to_string(),
@@ -441,32 +368,16 @@ fn optimize_plan(
             }))
         }
         LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
-            match input.as_ref() {
-                LogicalPlan::TableScan(TableScan { table_name, .. }) => {
-                    let new_required_columns = new_required_columns
-                        .iter()
-                        .map(|c| match &c.relation {
-                            Some(q) if q == alias => Column {
-                                relation: Some(table_name.clone()),
-                                name: c.name.clone(),
-                            },
-                            _ => c.clone(),
-                        })
-                        .collect();
-                    let new_inputs = vec![optimize_plan(
-                        _optimizer,
-                        input,
-                        &new_required_columns,
-                        has_projection,
-                        _optimizer_config,
-                    )?];
-                    let expr = vec![];
-                    from_plan(plan, &expr, &new_inputs)
-                }
-                _ => Err(DataFusionError::Plan(
-                    "SubqueryAlias should only wrap TableScan".to_string(),
-                )),
-            }
+            let new_required_columns =
+                replace_alias(required_columns, alias, input.schema());
+            let child = optimize_plan(
+                _optimizer,
+                input,
+                &new_required_columns,
+                has_projection,
+                _optimizer_config,
+            )?;
+            from_plan(plan, &plan.expressions(), &[child])
         }
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
@@ -532,11 +443,93 @@ fn projection_equal(p: &Projection, p2: &Projection) -> bool {
         && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
 }
 
+fn replace_alias(
+    required_columns: &HashSet<Column>,
+    alias: &str,
+    input_schema: &DFSchemaRef,
+) -> HashSet<Column> {
+    let mut map = HashMap::new();
+    for field in input_schema.fields() {
+        let col = field.qualified_column();
+        let alias_col = Column {
+            relation: Some(alias.to_owned()),
+            name: col.name.clone(),
+        };
+        map.insert(alias_col, col);
+    }
+    required_columns
+        .iter()
+        .map(|col| map.get(col).unwrap_or(col).clone())
+        .collect::<HashSet<_>>()
+}
+
+fn push_down_scan(
+    scan: &TableScan,
+    required_columns: &HashSet<Column>,
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    // once we reach the table scan, we can use the accumulated set of column
+    // names to construct the set of column indexes in the scan
+    //
+    // we discard non-existing columns because some column names are not part of the schema,
+    // e.g. when the column derives from an aggregation
+    //
+    // Use BTreeSet to remove potential duplicates (e.g. union) as
+    // well as to sort the projection to ensure deterministic behavior
+    let schema = scan.source.schema();
+    let mut projection: BTreeSet<usize> = required_columns
+        .iter()
+        .filter(|c| {
+            c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name
+        })
+        .map(|c| schema.index_of(&c.name))
+        .filter_map(ArrowResult::ok)
+        .collect();
+
+    if projection.is_empty() {
+        if has_projection && !schema.fields().is_empty() {
+            // Ensure that we are reading at least one column from the table in case the query
+            // does not reference any columns directly such as "SELECT COUNT(1) FROM table",
+            // except when the table is empty (no column)
+            projection.insert(0);
+        } else {
+            // for table scan without projection, we default to return all columns
+            projection = scan
+                .source
+                .schema()
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, _)| i)
+                .collect::<BTreeSet<usize>>();
+        }
+    }
+
+    // create the projected schema
+    let projected_fields: Vec<DFField> = projection
+        .iter()
+        .map(|i| DFField::from_qualified(&scan.table_name, schema.fields()[*i].clone()))
+        .collect();
+
+    let projection = projection.into_iter().collect::<Vec<_>>();
+    let projected_schema = projected_fields.to_dfschema_ref()?;
+
+    // return the table scan with projection
+    Ok(LogicalPlan::TableScan(TableScan {
+        table_name: scan.table_name.clone(),
+        source: scan.source.clone(),
+        projection: Some(projection),
+        projected_schema,
+        filters: scan.filters.clone(),
+        fetch: scan.fetch,
+    }))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::test::*;
-    use arrow::datatypes::DataType;
+    use arrow::datatypes::{DataType, Schema};
     use datafusion_expr::expr::Cast;
     use datafusion_expr::{
         col, count, lit,