You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 18:33:41 UTC
[arrow-datafusion] branch master updated: Support `SubqueryAlias` in optimizer-executor. (#4293)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 07f65bff3 Support `SubqueryAlias` in optimizer-executor. (#4293)
07f65bff3 is described below
commit 07f65bff330677cb273cb2edb6cdfe958e69ce31
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Nov 24 02:33:34 2022 +0800
Support `SubqueryAlias` in optimizer-executor. (#4293)
---
datafusion/core/src/physical_plan/planner.rs | 7 +-
datafusion/optimizer/src/projection_push_down.rs | 201 +++++++++++------------
2 files changed, 98 insertions(+), 110 deletions(-)
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 7345f44d6..691c935b8 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -969,12 +969,7 @@ impl DefaultPhysicalPlanner {
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
- match input.as_ref() {
- LogicalPlan::TableScan(..) => {
- self.create_initial_plan(input, session_state).await
- }
- _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
- }
+ self.create_initial_plan(input, session_state).await
}
LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
let input = self.create_initial_plan(input, session_state).await?;
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index 5a44247ea..916071bd5 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -19,7 +19,7 @@
//! loaded into memory
use crate::{OptimizerConfig, OptimizerRule};
-use arrow::datatypes::{Field, Schema};
+use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema,
@@ -34,6 +34,7 @@ use datafusion_expr::{
utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan},
Expr,
};
+use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
@@ -72,60 +73,6 @@ impl ProjectionPushDown {
}
}
-fn get_projected_schema(
- table_name: Option<&String>,
- schema: &Schema,
- required_columns: &HashSet<Column>,
- has_projection: bool,
-) -> Result<(Vec<usize>, DFSchemaRef)> {
- // once we reach the table scan, we can use the accumulated set of column
- // names to construct the set of column indexes in the scan
- //
- // we discard non-existing columns because some column names are not part of the schema,
- // e.g. when the column derives from an aggregation
- //
- // Use BTreeSet to remove potential duplicates (e.g. union) as
- // well as to sort the projection to ensure deterministic behavior
- let mut projection: BTreeSet<usize> = required_columns
- .iter()
- .filter(|c| c.relation.is_none() || c.relation.as_ref() == table_name)
- .map(|c| schema.index_of(&c.name))
- .filter_map(ArrowResult::ok)
- .collect();
-
- if projection.is_empty() {
- if has_projection && !schema.fields().is_empty() {
- // Ensure that we are reading at least one column from the table in case the query
- // does not reference any columns directly such as "SELECT COUNT(1) FROM table",
- // except when the table is empty (no column)
- projection.insert(0);
- } else {
- // for table scan without projection, we default to return all columns
- projection = schema
- .fields()
- .iter()
- .enumerate()
- .map(|(i, _)| i)
- .collect::<BTreeSet<usize>>();
- }
- }
-
- // create the projected schema
- let projected_fields: Vec<DFField> = match table_name {
- Some(qualifier) => projection
- .iter()
- .map(|i| DFField::from_qualified(qualifier, schema.fields()[*i].clone()))
- .collect(),
- None => projection
- .iter()
- .map(|i| DFField::from(schema.fields()[*i].clone()))
- .collect(),
- };
-
- let projection = projection.into_iter().collect::<Vec<_>>();
- Ok((projection, projected_fields.to_dfschema_ref()?))
-}
-
/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
_optimizer: &ProjectionPushDown,
@@ -348,28 +295,8 @@ fn optimize_plan(
}
// scans:
// * remove un-used columns from the scan projection
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- filters,
- fetch: limit,
- ..
- }) => {
- let (projection, projected_schema) = get_projected_schema(
- Some(table_name),
- &source.schema(),
- required_columns,
- has_projection,
- )?;
- // return the table scan with projection
- Ok(LogicalPlan::TableScan(TableScan {
- table_name: table_name.clone(),
- source: source.clone(),
- projection: Some(projection),
- projected_schema,
- filters: filters.clone(),
- fetch: *limit,
- }))
+ LogicalPlan::TableScan(scan) => {
+ push_down_scan(scan, &new_required_columns, has_projection)
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
@@ -441,32 +368,16 @@ fn optimize_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
- match input.as_ref() {
- LogicalPlan::TableScan(TableScan { table_name, .. }) => {
- let new_required_columns = new_required_columns
- .iter()
- .map(|c| match &c.relation {
- Some(q) if q == alias => Column {
- relation: Some(table_name.clone()),
- name: c.name.clone(),
- },
- _ => c.clone(),
- })
- .collect();
- let new_inputs = vec![optimize_plan(
- _optimizer,
- input,
- &new_required_columns,
- has_projection,
- _optimizer_config,
- )?];
- let expr = vec![];
- from_plan(plan, &expr, &new_inputs)
- }
- _ => Err(DataFusionError::Plan(
- "SubqueryAlias should only wrap TableScan".to_string(),
- )),
- }
+ let new_required_columns =
+ replace_alias(required_columns, alias, input.schema());
+ let child = optimize_plan(
+ _optimizer,
+ input,
+ &new_required_columns,
+ has_projection,
+ _optimizer_config,
+ )?;
+ from_plan(plan, &plan.expressions(), &[child])
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
@@ -532,11 +443,93 @@ fn projection_equal(p: &Projection, p2: &Projection) -> bool {
&& p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
}
+fn replace_alias(
+ required_columns: &HashSet<Column>,
+ alias: &str,
+ input_schema: &DFSchemaRef,
+) -> HashSet<Column> {
+ let mut map = HashMap::new();
+ for field in input_schema.fields() {
+ let col = field.qualified_column();
+ let alias_col = Column {
+ relation: Some(alias.to_owned()),
+ name: col.name.clone(),
+ };
+ map.insert(alias_col, col);
+ }
+ required_columns
+ .iter()
+ .map(|col| map.get(col).unwrap_or(col).clone())
+ .collect::<HashSet<_>>()
+}
+
+fn push_down_scan(
+ scan: &TableScan,
+ required_columns: &HashSet<Column>,
+ has_projection: bool,
+) -> Result<LogicalPlan> {
+ // once we reach the table scan, we can use the accumulated set of column
+ // names to construct the set of column indexes in the scan
+ //
+ // we discard non-existing columns because some column names are not part of the schema,
+ // e.g. when the column derives from an aggregation
+ //
+ // Use BTreeSet to remove potential duplicates (e.g. union) as
+ // well as to sort the projection to ensure deterministic behavior
+ let schema = scan.source.schema();
+ let mut projection: BTreeSet<usize> = required_columns
+ .iter()
+ .filter(|c| {
+ c.relation.is_none() || c.relation.as_ref().unwrap() == &scan.table_name
+ })
+ .map(|c| schema.index_of(&c.name))
+ .filter_map(ArrowResult::ok)
+ .collect();
+
+ if projection.is_empty() {
+ if has_projection && !schema.fields().is_empty() {
+ // Ensure that we are reading at least one column from the table in case the query
+ // does not reference any columns directly such as "SELECT COUNT(1) FROM table",
+ // except when the table is empty (no column)
+ projection.insert(0);
+ } else {
+ // for table scan without projection, we default to return all columns
+ projection = scan
+ .source
+ .schema()
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _)| i)
+ .collect::<BTreeSet<usize>>();
+ }
+ }
+
+ // create the projected schema
+ let projected_fields: Vec<DFField> = projection
+ .iter()
+ .map(|i| DFField::from_qualified(&scan.table_name, schema.fields()[*i].clone()))
+ .collect();
+
+ let projection = projection.into_iter().collect::<Vec<_>>();
+ let projected_schema = projected_fields.to_dfschema_ref()?;
+
+ // return the table scan with projection
+ Ok(LogicalPlan::TableScan(TableScan {
+ table_name: scan.table_name.clone(),
+ source: scan.source.clone(),
+ projection: Some(projection),
+ projected_schema,
+ filters: scan.filters.clone(),
+ fetch: scan.fetch,
+ }))
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::test::*;
- use arrow::datatypes::DataType;
+ use arrow::datatypes::{DataType, Schema};
use datafusion_expr::expr::Cast;
use datafusion_expr::{
col, count, lit,