You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/08 22:38:11 UTC

[arrow-datafusion] branch master updated: Support wildcard select on multiple column using joins (#4840)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ae2b9eb Support wildcard select on multiple column using joins (#4840)
63ae2b9eb is described below

commit 63ae2b9ebbe853648492576cda457a3049bbd2dc
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Mon Jan 9 09:38:05 2023 +1100

    Support wildcard select on multiple column using joins (#4840)
---
 datafusion/expr/src/logical_plan/builder.rs |  5 ++-
 datafusion/expr/src/utils.rs                | 24 +++++++---
 datafusion/sql/src/select.rs                |  2 +-
 datafusion/sql/tests/integration_test.rs    | 70 +++++++++++++++++++++++++++++
 4 files changed, 93 insertions(+), 8 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index d49af378b..63783a110 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1042,8 +1042,9 @@ pub fn project(
             Expr::Wildcard => {
                 projected_expr.extend(expand_wildcard(input_schema, &plan)?)
             }
-            Expr::QualifiedWildcard { ref qualifier } => projected_expr
-                .extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?),
+            Expr::QualifiedWildcard { ref qualifier } => {
+                projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
+            }
             _ => projected_expr
                 .push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
         }
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8cf79c612..682d13321 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -150,13 +150,23 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
     let using_columns = plan.using_columns()?;
     let columns_to_skip = using_columns
         .into_iter()
-        // For each USING JOIN condition, only expand to one column in projection
+        // For each USING JOIN condition, only expand to one of each join column in projection
         .flat_map(|cols| {
             let mut cols = cols.into_iter().collect::<Vec<_>>();
             // sort join columns to make sure we consistently keep the same
             // qualified column
             cols.sort();
-            cols.into_iter().skip(1)
+            let mut out_column_names: HashSet<String> = HashSet::new();
+            cols.into_iter()
+                .filter_map(|c| {
+                    if out_column_names.contains(&c.name) {
+                        Some(c)
+                    } else {
+                        out_column_names.insert(c.name);
+                        None
+                    }
+                })
+                .collect::<Vec<_>>()
         })
         .collect::<HashSet<_>>();
 
@@ -186,7 +196,6 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
 pub fn expand_qualified_wildcard(
     qualifier: &str,
     schema: &DFSchema,
-    plan: &LogicalPlan,
 ) -> Result<Vec<Expr>> {
     let qualified_fields: Vec<DFField> = schema
         .fields_with_qualified(qualifier)
@@ -198,9 +207,14 @@ pub fn expand_qualified_wildcard(
             "Invalid qualifier {qualifier}"
         )));
     }
-    let qualifier_schema =
+    let qualified_schema =
         DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
-    expand_wildcard(&qualifier_schema, plan)
+    // if qualified, allow all columns in output (i.e. ignore using column check)
+    Ok(qualified_schema
+        .fields()
+        .iter()
+        .map(|f| Expr::Column(f.qualified_column()))
+        .collect::<Vec<Expr>>())
 }
 
 /// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 53319772a..9056c1d8a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -368,7 +368,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 let qualifier = format!("{object_name}");
                 // do not expand from outer schema
-                expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
+                expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
             }
         }
     }
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index a246feda2..8b3d8c4df 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -395,6 +395,76 @@ fn join_with_ambiguous_column() {
     quick_test(sql, expected);
 }
 
+#[test]
+fn using_join_multiple_keys() {
+    let sql = "SELECT * FROM person a join person b using (id, age)";
+    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+        b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
+                        \n  Inner Join: Using a.id = b.id, a.age = b.age\
+                        \n    SubqueryAlias: a\
+                        \n      TableScan: person\
+                        \n    SubqueryAlias: b\
+                        \n      TableScan: person";
+    quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_subquery() {
+    let sql =
+        "SELECT age FROM (SELECT * FROM person a join person b using (id, age, state))";
+    let expected = "Projection: a.age\
+                        \n  Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+        b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
+                        \n    Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
+                        \n      SubqueryAlias: a\
+                        \n        TableScan: person\
+                        \n      SubqueryAlias: b\
+                        \n        TableScan: person";
+    quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_qualified_wildcard_select() {
+    let sql = "SELECT a.* FROM person a join person b using (id, age)";
+    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀\
+                        \n  Inner Join: Using a.id = b.id, a.age = b.age\
+                        \n    SubqueryAlias: a\
+                        \n      TableScan: person\
+                        \n    SubqueryAlias: b\
+                        \n      TableScan: person";
+    quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_select_all_columns() {
+    let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
+    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+        b.id, b.first_name, b.last_name, b.age, b.state, b.salary, b.birth_date, b.😀\
+                        \n  Inner Join: Using a.id = b.id, a.age = b.age\
+                        \n    SubqueryAlias: a\
+                        \n      TableScan: person\
+                        \n    SubqueryAlias: b\
+                        \n      TableScan: person";
+    quick_test(sql, expected);
+}
+
+#[test]
+fn using_join_multiple_keys_multiple_joins() {
+    let sql = "SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state)";
+    let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
+        b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
+        c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
+                        \n  Inner Join: Using a.id = c.id, a.age = c.age, a.state = c.state\
+                        \n    Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
+                        \n      SubqueryAlias: a\
+                        \n        TableScan: person\
+                        \n      SubqueryAlias: b\
+                        \n        TableScan: person\
+                        \n    SubqueryAlias: c\
+                        \n      TableScan: person";
+    quick_test(sql, expected);
+}
+
 #[test]
 fn select_with_having() {
     let sql = "SELECT id, age