You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/12 19:14:40 UTC
[arrow-datafusion] branch master updated: Add LogicalPlan::SubqueryAlias (#2172)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ee95d41cc Add LogicalPlan::SubqueryAlias (#2172)
ee95d41cc is described below
commit ee95d41cc93a257a21bb43d78ae33d15a9f30048
Author: Andy Grove <ag...@apache.org>
AuthorDate: Tue Apr 12 13:14:34 2022 -0600
Add LogicalPlan::SubqueryAlias (#2172)
---
ballista/rust/core/proto/ballista.proto | 6 +++
ballista/rust/core/src/serde/logical_plan/mod.rs | 26 +++++++++++-
datafusion/core/src/logical_plan/builder.rs | 14 ++++++-
datafusion/core/src/logical_plan/plan.rs | 25 +++++++++++
.../core/src/optimizer/common_subexpr_eliminate.rs | 1 +
.../core/src/optimizer/projection_push_down.rs | 48 +++++++++++++++++++++-
datafusion/core/src/optimizer/utils.rs | 13 +++++-
datafusion/core/src/physical_plan/planner.rs | 13 +++++-
datafusion/core/src/sql/planner.rs | 40 ++++++++++++------
9 files changed, 167 insertions(+), 19 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 87c6ccabb..bab783a9e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -52,6 +52,7 @@ message LogicalPlanNode {
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
+ SubqueryAliasNode subquery_alias = 21;
}
}
@@ -241,6 +242,11 @@ message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
}
+message SubqueryAliasNode {
+ LogicalPlanNode input = 1;
+ string alias = 2;
+}
+
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Physical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 57cb35dec..a0264271a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,9 +28,8 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-
use datafusion::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
+ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
@@ -377,6 +376,14 @@ impl AsLogicalPlan for LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
+ LogicalPlanType::SubqueryAlias(aliased_relation) => {
+ let input: LogicalPlan =
+ into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
+ LogicalPlanBuilder::from(input)
+ .alias(&aliased_relation.alias)?
+ .build()
+ .map_err(|e| e.into())
+ }
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan =
into_logical_plan!(limit.input, ctx, extension_codec)?;
@@ -700,6 +707,21 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+ let input: protobuf::LogicalPlanNode =
+ protobuf::LogicalPlanNode::try_from_logical_plan(
+ input.as_ref(),
+ extension_codec,
+ )?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
+ protobuf::SubqueryAliasNode {
+ input: Some(Box::new(input)),
+ alias: alias.clone(),
+ },
+ ))),
+ })
+ }
LogicalPlan::Limit(Limit { input, n }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index d2f5c0418..c88b25d0a 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -26,7 +26,7 @@ use crate::error::{DataFusionError, Result};
use crate::logical_plan::expr_schema::ExprSchemable;
use crate::logical_plan::plan::{
Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
- TableScan, ToStringifiedPlan, Union, Window,
+ SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
};
use crate::optimizer::utils;
use crate::prelude::*;
@@ -518,6 +518,18 @@ impl LogicalPlanBuilder {
})))
}
+ /// Apply an alias
+ pub fn alias(&self, alias: &str) -> Result<Self> {
+ let schema: Schema = self.schema().as_ref().clone().into();
+ let schema =
+ DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
+ Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias {
+ input: Arc::new(self.plan.clone()),
+ alias: alias.to_string(),
+ schema,
+ })))
+ }
+
/// Add missing sort columns to all downstream projection
fn add_missing_columns(
&self,
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 88e0a185b..66307c6ab 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -87,6 +87,17 @@ pub struct Projection {
pub alias: Option<String>,
}
+/// Aliased subquery
+#[derive(Clone)]
+pub struct SubqueryAlias {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The alias for the input relation
+ pub alias: String,
+ /// The schema with qualified field names
+ pub schema: DFSchemaRef,
+}
+
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
@@ -372,6 +383,8 @@ pub enum LogicalPlan {
TableScan(TableScan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation(EmptyRelation),
+ /// Aliased relation provides, or changes, the name of a relation.
+ SubqueryAlias(SubqueryAlias),
/// Produces the first `n` tuples from its input and discards the rest.
Limit(Limit),
/// Creates an external table.
@@ -416,6 +429,7 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+ LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
@@ -464,6 +478,9 @@ impl LogicalPlan {
schemas.insert(0, schema);
schemas
}
+ LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
+ vec![schema]
+ }
LogicalPlan::Union(Union { schema, .. }) => {
vec![schema]
}
@@ -525,6 +542,7 @@ impl LogicalPlan {
// plans without expressions
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
@@ -553,6 +571,7 @@ impl LogicalPlan {
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
@@ -701,6 +720,9 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
+ input.accept(visitor)?
+ }
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.accept(visitor)?
}
@@ -1072,6 +1094,9 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+ LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+ write!(f, "SubqueryAlias: {}", alias)
+ }
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 3db1ca4d3..39964df4a 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values(_)
| LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 5b61ace11..10bf5d10f 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -21,7 +21,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
- Aggregate, Analyze, Join, Projection, TableScan, Window,
+ Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
@@ -432,6 +432,34 @@ fn optimize_plan(
alias: alias.clone(),
}))
}
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+ match input.as_ref() {
+ LogicalPlan::TableScan(TableScan { table_name, .. }) => {
+ let new_required_columns = new_required_columns
+ .iter()
+ .map(|c| match &c.relation {
+ Some(q) if q == alias => Column {
+ relation: Some(table_name.clone()),
+ name: c.name.clone(),
+ },
+ _ => c.clone(),
+ })
+ .collect();
+ let new_inputs = vec![optimize_plan(
+ _optimizer,
+ input,
+ &new_required_columns,
+ has_projection,
+ _execution_props,
+ )?];
+ let expr = vec![];
+ utils::from_plan(plan, &expr, &new_inputs)
+ }
+ _ => Err(DataFusionError::Plan(
+ "SubqueryAlias should only wrap TableScan".to_string(),
+ )),
+ }
+ }
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
LogicalPlan::Limit(_)
@@ -515,6 +543,24 @@ mod tests {
Ok(())
}
+ #[test]
+ fn aggregate_group_by_with_table_alias() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .alias("a")?
+ .aggregate(vec![col("c")], vec![max(col("b"))])?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\
+ \n SubqueryAlias: a\
+ \n TableScan: test projection=Some([1, 2])";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
#[test]
fn aggregate_no_group_by_with_filter() -> Result<()> {
let table_scan = test_table_scan()?;
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index c01515d35..0dab2d3ed 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -20,7 +20,7 @@
use super::optimizer::OptimizerRule;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
- Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Window,
+ Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use crate::logical_plan::{
@@ -34,6 +34,7 @@ use crate::{
error::{DataFusionError, Result},
logical_plan::ExpressionVisitor,
};
+use datafusion_common::DFSchema;
use std::{collections::HashSet, sync::Arc};
const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
@@ -222,6 +223,16 @@ pub fn from_plan(
let right = &inputs[1];
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
+ LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
+ let schema = inputs[0].schema().as_ref().clone().into();
+ let schema =
+ DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
+ Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
+ alias: alias.clone(),
+ input: Arc::new(inputs[0].clone()),
+ schema,
+ }))
+ }
LogicalPlan::Limit(Limit { n, .. }) => Ok(LogicalPlan::Limit(Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 32fa12fb9..98076d136 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -24,7 +24,8 @@ use super::{
};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, TableScan, Window,
+ Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
+ Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
@@ -785,6 +786,16 @@ impl DefaultPhysicalPlanner {
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+ match input.as_ref() {
+ LogicalPlan::TableScan(scan) => {
+ let mut scan = scan.clone();
+ scan.table_name = alias.clone();
+ self.create_initial_plan(input, session_state).await
+ }
+ _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
+ }
+ }
LogicalPlan::Limit(Limit { input, n, .. }) => {
let limit = *n;
let input = self.create_initial_plan(input, session_state).await?;
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 9a7c5eb44..84ddace9a 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -644,16 +644,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.schema_provider.get_table_provider(name.try_into()?),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
- (_, Some(provider)) => LogicalPlanBuilder::scan(
- // take alias into account to support `JOIN table1 as table2`
- alias
- .as_ref()
- .map(|a| a.name.value.as_str())
- .unwrap_or(&table_name),
- provider,
- None,
- )?
- .build(),
+ (_, Some(provider)) => {
+ let scan =
+ LogicalPlanBuilder::scan(&table_name, provider, None);
+ let scan = match alias {
+ Some(ref name) => scan?.alias(name.name.value.as_str()),
+ _ => scan,
+ };
+ scan?.build()
+ }
(None, None) => Err(DataFusionError::Plan(format!(
"Table or CTE with name '{}' not found",
name
@@ -2492,7 +2491,8 @@ mod tests {
FROM lineitem l (a, b, c)";
let expected = "Projection: #l.a, #l.b, #l.c\
\n Projection: #l.l_item_id AS a, #l.l_description AS b, #l.price AS c, alias=l\
- \n TableScan: l projection=None";
+ \n SubqueryAlias: l\
+ \n TableScan: lineitem projection=None";
quick_test(sql, expected);
}
@@ -3458,7 +3458,8 @@ mod tests {
let expected = "Projection: #person.first_name, #person.id\
\n Inner Join: Using #person.id = #person2.id\
\n TableScan: person projection=None\
- \n TableScan: person2 projection=None";
+ \n SubqueryAlias: person2\
+ \n TableScan: person projection=None";
quick_test(sql, expected);
}
@@ -3471,7 +3472,8 @@ mod tests {
let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
\n Inner Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
\n TableScan: lineitem projection=None\
- \n TableScan: lineitem2 projection=None";
+ \n SubqueryAlias: lineitem2\
+ \n TableScan: lineitem projection=None";
quick_test(sql, expected);
}
@@ -4067,6 +4069,18 @@ mod tests {
quick_test(sql, expected);
}
+ #[test]
+ fn join_with_aliases() {
+ let sql = "select peeps.id, folks.first_name from person as peeps join person as folks on peeps.id = folks.id";
+ let expected = "Projection: #peeps.id, #folks.first_name\
+ \n Inner Join: #peeps.id = #folks.id\
+ \n SubqueryAlias: peeps\
+ \n TableScan: person projection=None\
+ \n SubqueryAlias: folks\
+ \n TableScan: person projection=None";
+ quick_test(sql, expected);
+ }
+
#[test]
fn cte_use_same_name_multiple_times() {
let sql = "with a as (select * from person), a as (select * from orders) select * from a;";