You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/12 19:14:40 UTC

[arrow-datafusion] branch master updated: Add LogicalPlan::SubqueryAlias (#2172)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ee95d41cc Add LogicalPlan::SubqueryAlias (#2172)
ee95d41cc is described below

commit ee95d41cc93a257a21bb43d78ae33d15a9f30048
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue Apr 12 13:14:34 2022 -0600

    Add LogicalPlan::SubqueryAlias (#2172)
---
 ballista/rust/core/proto/ballista.proto            |  6 +++
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 26 +++++++++++-
 datafusion/core/src/logical_plan/builder.rs        | 14 ++++++-
 datafusion/core/src/logical_plan/plan.rs           | 25 +++++++++++
 .../core/src/optimizer/common_subexpr_eliminate.rs |  1 +
 .../core/src/optimizer/projection_push_down.rs     | 48 +++++++++++++++++++++-
 datafusion/core/src/optimizer/utils.rs             | 13 +++++-
 datafusion/core/src/physical_plan/planner.rs       | 13 +++++-
 datafusion/core/src/sql/planner.rs                 | 40 ++++++++++++------
 9 files changed, 167 insertions(+), 19 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 87c6ccabb..bab783a9e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -52,6 +52,7 @@ message LogicalPlanNode {
     CreateCatalogSchemaNode create_catalog_schema = 18;
     UnionNode union = 19;
     CreateCatalogNode create_catalog = 20;
+    SubqueryAliasNode subquery_alias = 21;
   }
 }
 
@@ -241,6 +242,11 @@ message SelectionExecNode {
   datafusion.LogicalExprNode expr = 1;
 }
 
+message SubqueryAliasNode {
+  LogicalPlanNode input = 1;
+  string alias = 2;
+}
+
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Physical Plan
 ///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 57cb35dec..a0264271a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,9 +28,8 @@ use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-
 use datafusion::logical_plan::plan::{
-    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
+    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
 };
 use datafusion::logical_plan::{
     Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
@@ -377,6 +376,14 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .build()
                     .map_err(|e| e.into())
             }
+            LogicalPlanType::SubqueryAlias(aliased_relation) => {
+                let input: LogicalPlan =
+                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
+                LogicalPlanBuilder::from(input)
+                    .alias(&aliased_relation.alias)?
+                    .build()
+                    .map_err(|e| e.into())
+            }
             LogicalPlanType::Limit(limit) => {
                 let input: LogicalPlan =
                     into_logical_plan!(limit.input, ctx, extension_codec)?;
@@ -700,6 +707,21 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
+            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+                let input: protobuf::LogicalPlanNode =
+                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                        input.as_ref(),
+                        extension_codec,
+                    )?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
+                        protobuf::SubqueryAliasNode {
+                            input: Some(Box::new(input)),
+                            alias: alias.clone(),
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Limit(Limit { input, n }) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index d2f5c0418..c88b25d0a 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -26,7 +26,7 @@ use crate::error::{DataFusionError, Result};
 use crate::logical_plan::expr_schema::ExprSchemable;
 use crate::logical_plan::plan::{
     Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
-    TableScan, ToStringifiedPlan, Union, Window,
+    SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
 };
 use crate::optimizer::utils;
 use crate::prelude::*;
@@ -518,6 +518,18 @@ impl LogicalPlanBuilder {
         })))
     }
 
+    /// Apply an alias
+    pub fn alias(&self, alias: &str) -> Result<Self> {
+        let schema: Schema = self.schema().as_ref().clone().into();
+        let schema =
+            DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
+        Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias {
+            input: Arc::new(self.plan.clone()),
+            alias: alias.to_string(),
+            schema,
+        })))
+    }
+
     /// Add missing sort columns to all downstream projection
     fn add_missing_columns(
         &self,
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 88e0a185b..66307c6ab 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -87,6 +87,17 @@ pub struct Projection {
     pub alias: Option<String>,
 }
 
+/// Aliased subquery
+#[derive(Clone)]
+pub struct SubqueryAlias {
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+    /// The alias for the input relation
+    pub alias: String,
+    /// The schema with qualified field names
+    pub schema: DFSchemaRef,
+}
+
 /// Filters rows from its input that do not match an
 /// expression (essentially a WHERE clause with a predicate
 /// expression).
@@ -372,6 +383,8 @@ pub enum LogicalPlan {
     TableScan(TableScan),
     /// Produces no rows: An empty relation with an empty schema
     EmptyRelation(EmptyRelation),
+    /// Aliased relation provides, or changes, the name of a relation.
+    SubqueryAlias(SubqueryAlias),
     /// Produces the first `n` tuples from its input and discards the rest.
     Limit(Limit),
     /// Creates an external table.
@@ -416,6 +429,7 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
                 schema
             }
@@ -464,6 +478,9 @@ impl LogicalPlan {
                 schemas.insert(0, schema);
                 schemas
             }
+            LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
+                vec![schema]
+            }
             LogicalPlan::Union(Union { schema, .. }) => {
                 vec![schema]
             }
@@ -525,6 +542,7 @@ impl LogicalPlan {
             // plans without expressions
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Limit(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
@@ -553,6 +571,7 @@ impl LogicalPlan {
             LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
             LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
             LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
             LogicalPlan::Explain(explain) => vec![&explain.plan],
@@ -701,6 +720,9 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+            LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
+                input.accept(visitor)?
+            }
             LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
                 input.accept(visitor)?
             }
@@ -1072,6 +1094,9 @@ impl LogicalPlan {
                         }
                     },
                     LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+                    LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+                        write!(f, "SubqueryAlias: {}", alias)
+                    }
                     LogicalPlan::CreateExternalTable(CreateExternalTable {
                         ref name,
                         ..
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 3db1ca4d3..39964df4a 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::Values(_)
         | LogicalPlan::EmptyRelation(_)
+        | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Limit(_)
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::Explain { .. }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 5b61ace11..10bf5d10f 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -21,7 +21,7 @@
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{
-    Aggregate, Analyze, Join, Projection, TableScan, Window,
+    Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window,
 };
 use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
@@ -432,6 +432,34 @@ fn optimize_plan(
                 alias: alias.clone(),
             }))
         }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            match input.as_ref() {
+                LogicalPlan::TableScan(TableScan { table_name, .. }) => {
+                    let new_required_columns = new_required_columns
+                        .iter()
+                        .map(|c| match &c.relation {
+                            Some(q) if q == alias => Column {
+                                relation: Some(table_name.clone()),
+                                name: c.name.clone(),
+                            },
+                            _ => c.clone(),
+                        })
+                        .collect();
+                    let new_inputs = vec![optimize_plan(
+                        _optimizer,
+                        input,
+                        &new_required_columns,
+                        has_projection,
+                        _execution_props,
+                    )?];
+                    let expr = vec![];
+                    utils::from_plan(plan, &expr, &new_inputs)
+                }
+                _ => Err(DataFusionError::Plan(
+                    "SubqueryAlias should only wrap TableScan".to_string(),
+                )),
+            }
+        }
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
         LogicalPlan::Limit(_)
@@ -515,6 +543,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn aggregate_group_by_with_table_alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .alias("a")?
+            .aggregate(vec![col("c")], vec![max(col("b"))])?
+            .build()?;
+
+        let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\
+        \n  SubqueryAlias: a\
+        \n    TableScan: test projection=Some([1, 2])";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
     #[test]
     fn aggregate_no_group_by_with_filter() -> Result<()> {
         let table_scan = test_table_scan()?;
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index c01515d35..0dab2d3ed 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -20,7 +20,7 @@
 use super::optimizer::OptimizerRule;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{
-    Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window,
+    Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
 };
 
 use crate::logical_plan::{
@@ -34,6 +34,7 @@ use crate::{
     error::{DataFusionError, Result},
     logical_plan::ExpressionVisitor,
 };
+use datafusion_common::DFSchema;
 use std::{collections::HashSet, sync::Arc};
 
 const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
@@ -222,6 +223,16 @@ pub fn from_plan(
             let right = &inputs[1];
             LogicalPlanBuilder::from(left).cross_join(right)?.build()
         }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+            let schema = inputs[0].schema().as_ref().clone().into();
+            let schema =
+                DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
+            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
+                alias: alias.clone(),
+                input: Arc::new(inputs[0].clone()),
+                schema,
+            }))
+        }
         LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
             n: *n,
             input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 32fa12fb9..98076d136 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -24,7 +24,8 @@ use super::{
 };
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_plan::plan::{
-    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window,
+    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
+    Window,
 };
 use crate::logical_plan::{
     unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
@@ -785,6 +786,16 @@ impl DefaultPhysicalPlanner {
                     *produce_one_row,
                     SchemaRef::new(schema.as_ref().to_owned().into()),
                 ))),
+                LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+                    match input.as_ref() {
+                        LogicalPlan::TableScan(scan) => {
+                            let mut scan = scan.clone();
+                            scan.table_name = alias.clone();
+                            self.create_initial_plan(input, session_state).await
+                        }
+                        _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
+                    }
+                }
                 LogicalPlan::Limit(Limit { input, n, .. }) => {
                     let limit = *n;
                     let input = self.create_initial_plan(input, session_state).await?;
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 9a7c5eb44..84ddace9a 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -644,16 +644,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         self.schema_provider.get_table_provider(name.try_into()?),
                     ) {
                         (Some(cte_plan), _) => Ok(cte_plan.clone()),
-                        (_, Some(provider)) => LogicalPlanBuilder::scan(
-                            // take alias into account to support `JOIN table1 as table2`
-                            alias
-                                .as_ref()
-                                .map(|a| a.name.value.as_str())
-                                .unwrap_or(&table_name),
-                            provider,
-                            None,
-                        )?
-                        .build(),
+                        (_, Some(provider)) => {
+                            let scan =
+                                LogicalPlanBuilder::scan(&table_name, provider, None);
+                            let scan = match alias {
+                                Some(ref name) => scan?.alias(name.name.value.as_str()),
+                                _ => scan,
+                            };
+                            scan?.build()
+                        }
                         (None, None) => Err(DataFusionError::Plan(format!(
                             "Table or CTE with name '{}' not found",
                             name
@@ -2492,7 +2491,8 @@ mod tests {
                    FROM lineitem l (a, b, c)";
         let expected = "Projection: #l.a, #l.b, #l.c\
                         \n  Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\
-                        \n    TableScan: l projection=None";
+                        \n    SubqueryAlias: l\
+                        \n      TableScan: lineitem projection=None";
         quick_test(sql, expected);
     }
 
@@ -3458,7 +3458,8 @@ mod tests {
         let expected = "Projection: #person.first_name, #person.id\
         \n  Inner Join: Using #person.id = #person2.id\
         \n    TableScan: person projection=None\
-        \n    TableScan: person2 projection=None";
+        \n    SubqueryAlias: person2\
+        \n      TableScan: person projection=None";
         quick_test(sql, expected);
     }
 
@@ -3471,7 +3472,8 @@ mod tests {
         let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
         \n  Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
         \n    TableScan: lineitem projection=None\
-        \n    TableScan: lineitem2 projection=None";
+        \n    SubqueryAlias: lineitem2\
+        \n      TableScan: lineitem projection=None";
         quick_test(sql, expected);
     }
 
@@ -4067,6 +4069,18 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    fn join_with_aliases() {
+        let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id";
+        let expected = "Projection: #peeps.id, #folks.first_name\
+                                    \n  Inner Join: #peeps.id = #folks.id\
+                                    \n    SubqueryAlias: peeps\
+                                    \n      TableScan: person projection=None\
+                                    \n    SubqueryAlias: folks\
+                                    \n      TableScan: person projection=None";
+        quick_test(sql, expected);
+    }
+
     #[test]
     fn cte_use_same_name_multiple_times() {
         let sql = "with a as (select * from person), a as (select * from orders) select * from a;";