You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/08 22:38:11 UTC
[arrow-datafusion] branch master updated: Support wildcard select on multiple column using joins (#4840)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 63ae2b9eb Support wildcard select on multiple column using joins (#4840)
63ae2b9eb is described below
commit 63ae2b9ebbe853648492576cda457a3049bbd2dc
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Mon Jan 9 09:38:05 2023 +1100
Support wildcard select on multiple column using joins (#4840)
---
datafusion/expr/src/logical_plan/builder.rs | 5 ++-
datafusion/expr/src/utils.rs | 24 +++++++---
datafusion/sql/src/select.rs | 2 +-
datafusion/sql/tests/integration_test.rs | 70 +++++++++++++++++++++++++++++
4 files changed, 93 insertions(+), 8 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index d49af378b..63783a110 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1042,8 +1042,9 @@ pub fn project(
Expr::Wildcard => {
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
}
- Expr::QualifiedWildcard { ref qualifier } => projected_expr
- .extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?),
+ Expr::QualifiedWildcard { ref qualifier } => {
+ projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
+ }
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8cf79c612..682d13321 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -150,13 +150,23 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
let using_columns = plan.using_columns()?;
let columns_to_skip = using_columns
.into_iter()
- // For each USING JOIN condition, only expand to one column in projection
+ // For each USING JOIN condition, only expand to one of each join column in projection
.flat_map(|cols| {
let mut cols = cols.into_iter().collect::<Vec<_>>();
// sort join columns to make sure we consistently keep the same
// qualified column
cols.sort();
- cols.into_iter().skip(1)
+ let mut out_column_names: HashSet<String> = HashSet::new();
+ cols.into_iter()
+ .filter_map(|c| {
+ if out_column_names.contains(&c.name) {
+ Some(c)
+ } else {
+ out_column_names.insert(c.name);
+ None
+ }
+ })
+ .collect::<Vec<_>>()
})
.collect::<HashSet<_>>();
@@ -186,7 +196,6 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
- plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let qualified_fields: Vec<DFField> = schema
.fields_with_qualified(qualifier)
@@ -198,9 +207,14 @@ pub fn expand_qualified_wildcard(
"Invalid qualifier {qualifier}"
)));
}
- let qualifier_schema =
+ let qualified_schema =
DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
- expand_wildcard(&qualifier_schema, plan)
+ // if qualified, allow all columns in output (i.e. ignore using column check)
+ Ok(qualified_schema
+ .fields()
+ .iter()
+ .map(|f| Expr::Column(f.qualified_column()))
+ .collect::<Vec<Expr>>())
}
/// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 53319772a..9056c1d8a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -368,7 +368,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let qualifier = format!("{object_name}");
// do not expand from outer schema
- expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
+ expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
}
}
}
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index a246feda2..8b3d8c4df 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -395,6 +395,76 @@ fn join_with_ambiguous_column() {
quick_test(sql, expected);
}
+#[test]
+fn using_join_multiple_keys() {
+ let sql = "SELECT * FROM person a join person b using (id, age)";
+ let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+ b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
+ \n Inner Join: Using a.id = b.id, a.age = b.age\
+ \n SubqueryAlias: a\
+ \n TableScan: person\
+ \n SubqueryAlias: b\
+ \n TableScan: person";
+ quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_subquery() {
+ let sql =
+ "SELECT age FROM (SELECT * FROM person a join person b using (id, age, state))";
+ let expected = "Projection: a.age\
+ \n Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+ b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
+ \n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
+ \n SubqueryAlias: a\
+ \n TableScan: person\
+ \n SubqueryAlias: b\
+ \n TableScan: person";
+ quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_qualified_wildcard_select() {
+ let sql = "SELECT a.* FROM person a join person b using (id, age)";
+ let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀\
+ \n Inner Join: Using a.id = b.id, a.age = b.age\
+ \n SubqueryAlias: a\
+ \n TableScan: person\
+ \n SubqueryAlias: b\
+ \n TableScan: person";
+ quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_select_all_columns() {
+ let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
+ let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+ b.id, b.first_name, b.last_name, b.age, b.state, b.salary, b.birth_date, b.😀\
+ \n Inner Join: Using a.id = b.id, a.age = b.age\
+ \n SubqueryAlias: a\
+ \n TableScan: person\
+ \n SubqueryAlias: b\
+ \n TableScan: person";
+ quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_multiple_joins() {
+ let sql = "SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state)";
+ let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+ b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
+ c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
+ \n Inner Join: Using a.id = c.id, a.age = c.age, a.state = c.state\
+ \n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
+ \n SubqueryAlias: a\
+ \n TableScan: person\
+ \n SubqueryAlias: b\
+ \n TableScan: person\
+ \n SubqueryAlias: c\
+ \n TableScan: person";
+ quick_test(sql, expected);
+}
+
#[test]
fn select_with_having() {
let sql = "SELECT id, age