You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/13 13:46:40 UTC
[arrow-datafusion] branch master updated: dedup using join column
in wildcard expansion (#678)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 75a376f dedup using join column in wildcard expansion (#678)
75a376f is described below
commit 75a376f0ff2e8236c07a3b0a16374b7e3855c194
Author: QP Hou <qp...@scribd.com>
AuthorDate: Tue Jul 13 06:46:31 2021 -0700
dedup using join column in wildcard expansion (#678)
* dedup using join column in wildcard expansion
* reuse expand_wildcard in logical plan builder
---
datafusion/src/logical_plan/builder.rs | 72 +++++++++++++++++++++++++++++++---
datafusion/src/logical_plan/expr.rs | 2 +-
datafusion/src/logical_plan/mod.rs | 2 +-
datafusion/src/logical_plan/plan.rs | 2 +-
datafusion/src/sql/planner.rs | 34 +++++++++++++---
datafusion/src/sql/utils.rs | 14 +------
6 files changed, 99 insertions(+), 27 deletions(-)
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 41f29c4..85c4aea 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -17,7 +17,10 @@
//! This module provides a builder for creating LogicalPlans
-use std::{collections::HashMap, sync::Arc};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
use arrow::{
datatypes::{Schema, SchemaRef},
@@ -220,10 +223,7 @@ impl LogicalPlanBuilder {
for e in expr {
match e {
Expr::Wildcard => {
- (0..input_schema.fields().len()).for_each(|i| {
- projected_expr
- .push(Expr::Column(input_schema.field(i).qualified_column()))
- });
+ projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
}
_ => projected_expr
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
@@ -508,6 +508,47 @@ pub fn union_with_alias(
})
}
+/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
+pub(crate) fn expand_wildcard(
+ schema: &DFSchema,
+ plan: &LogicalPlan,
+) -> Result<Vec<Expr>> {
+ let using_columns = plan.using_columns()?;
+ let columns_to_skip = using_columns
+ .into_iter()
+ // For each USING JOIN condition, only expand to one column in projection
+ .map(|cols| {
+ let mut cols = cols.into_iter().collect::<Vec<_>>();
+ // sort join columns to make sure we consistently keep the same
+ // qualified column
+ cols.sort();
+ cols.into_iter().skip(1)
+ })
+ .flatten()
+ .collect::<HashSet<_>>();
+
+ if columns_to_skip.is_empty() {
+ Ok(schema
+ .fields()
+ .iter()
+ .map(|f| Expr::Column(f.qualified_column()))
+ .collect::<Vec<Expr>>())
+ } else {
+ Ok(schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ let col = f.qualified_column();
+ if !columns_to_skip.contains(&col) {
+ Some(Expr::Column(col))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<Expr>>())
+ }
+}
+
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
@@ -588,6 +629,27 @@ mod tests {
}
#[test]
+ fn plan_using_join_wildcard_projection() -> Result<()> {
+ let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)?
+ .build()?;
+
+ let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)?
+ .join_using(&t2, JoinType::Inner, vec!["id"])?
+ .project(vec![Expr::Wildcard])?
+ .build()?;
+
+ // id column should only show up once in projection
+ let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\
+ \n Join: Using #t1.id = #t2.id\
+ \n TableScan: t1 projection=None\
+ \n TableScan: t2 projection=None";
+
+ assert_eq!(expected, format!("{:?}", plan));
+
+ Ok(())
+ }
+
+ #[test]
fn plan_builder_union_combined_single_union() -> Result<()> {
let plan = LogicalPlanBuilder::scan_empty(
Some("employee_csv"),
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 59c9979..2eee140 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -34,7 +34,7 @@ use std::fmt;
use std::sync::Arc;
/// A named reference to a qualified field in a schema.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
/// relation/table name.
pub relation: Option<String>,
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 2c751ab..f381e31 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -21,7 +21,7 @@
//! Logical query plans can then be optimized and executed directly, or translated into
//! physical query plans and executed.
-mod builder;
+pub(crate) mod builder;
mod dfschema;
mod display;
mod expr;
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index b954b6a..2504dfa 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -25,8 +25,8 @@ use crate::error::DataFusionError;
use crate::logical_plan::dfschema::DFSchemaRef;
use crate::sql::parser::FileType;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use std::collections::HashSet;
use std::{
+ collections::HashSet,
fmt::{self, Display},
sync::Arc,
};
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index f89ba3f..41b4e20 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -27,8 +27,9 @@ use crate::datasource::TableProvider;
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
- and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan,
- LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema,
+ and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
+ DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan,
+ ToDFSchema,
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
@@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError;
use super::{
parser::DFParser,
utils::{
- can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
+ can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
find_aggregate_exprs, find_column_exprs, find_window_exprs,
group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
resolve_positions_to_exprs,
@@ -687,9 +688,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.map(|expr| self.sql_select_to_rex(expr, input_schema))
.collect::<Result<Vec<Expr>>>()?
- .iter()
- .flat_map(|expr| expand_wildcard(expr, input_schema))
- .map(|expr| normalize_col(expr, plan))
+ .into_iter()
+ .map(|expr| {
+ Ok(match expr {
+ Expr::Wildcard => expand_wildcard(input_schema, plan)?,
+ _ => vec![normalize_col(expr, plan)?],
+ })
+ })
+ .flat_map(|res| match res {
+ Ok(v) => v.into_iter().map(Ok).collect(),
+ Err(e) => vec![Err(e)],
+ })
.collect::<Result<Vec<Expr>>>()
}
@@ -2774,6 +2783,19 @@ mod tests {
}
#[test]
+ fn project_wildcard_on_join_with_using() {
+ let sql = "SELECT * \
+ FROM lineitem \
+ JOIN lineitem as lineitem2 \
+ USING (l_item_id)";
+ let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
+ \n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
+ \n TableScan: lineitem projection=None\
+ \n TableScan: lineitem2 projection=None";
+ quick_test(sql, expected);
+ }
+
+ #[test]
fn equijoin_explicit_syntax_3_tables() {
let sql = "SELECT id, order_id, l_description \
FROM person \
diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs
index 2824336..41bcd20 100644
--- a/datafusion/src/sql/utils.rs
+++ b/datafusion/src/sql/utils.rs
@@ -17,7 +17,7 @@
//! SQL Utility Functions
-use crate::logical_plan::{DFSchema, Expr, LogicalPlan};
+use crate::logical_plan::{Expr, LogicalPlan};
use crate::scalar::ScalarValue;
use crate::{
error::{DataFusionError, Result},
@@ -25,18 +25,6 @@ use crate::{
};
use std::collections::HashMap;
-/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
-pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec<Expr> {
- match expr {
- Expr::Wildcard => schema
- .fields()
- .iter()
- .map(|f| Expr::Column(f.qualified_column()))
- .collect::<Vec<Expr>>(),
- _ => vec![expr.clone()],
- }
-}
-
/// Collect all deeply nested `Expr::AggregateFunction` and
/// `Expr::AggregateUDF`. They are returned in order of occurrence (depth
/// first), with duplicates omitted.