You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/13 13:46:40 UTC

[arrow-datafusion] branch master updated: dedup using join column in wildcard expansion (#678)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 75a376f  dedup using join column in wildcard expansion (#678)
75a376f is described below

commit 75a376f0ff2e8236c07a3b0a16374b7e3855c194
Author: QP Hou <qp...@scribd.com>
AuthorDate: Tue Jul 13 06:46:31 2021 -0700

    dedup using join column in wildcard expansion (#678)
    
    * dedup using join column in wildcard expansion
    
    * reuse expand_wildcard in logical plan builder
---
 datafusion/src/logical_plan/builder.rs | 72 +++++++++++++++++++++++++++++++---
 datafusion/src/logical_plan/expr.rs    |  2 +-
 datafusion/src/logical_plan/mod.rs     |  2 +-
 datafusion/src/logical_plan/plan.rs    |  2 +-
 datafusion/src/sql/planner.rs          | 34 +++++++++++++---
 datafusion/src/sql/utils.rs            | 14 +------
 6 files changed, 99 insertions(+), 27 deletions(-)

diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 41f29c4..85c4aea 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -17,7 +17,10 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use std::{collections::HashMap, sync::Arc};
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+};
 
 use arrow::{
     datatypes::{Schema, SchemaRef},
@@ -220,10 +223,7 @@ impl LogicalPlanBuilder {
         for e in expr {
             match e {
                 Expr::Wildcard => {
-                    (0..input_schema.fields().len()).for_each(|i| {
-                        projected_expr
-                            .push(Expr::Column(input_schema.field(i).qualified_column()))
-                    });
+                    projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
                 }
                 _ => projected_expr
                     .push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
@@ -508,6 +508,47 @@ pub fn union_with_alias(
     })
 }
 
+/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
+pub(crate) fn expand_wildcard(
+    schema: &DFSchema,
+    plan: &LogicalPlan,
+) -> Result<Vec<Expr>> {
+    let using_columns = plan.using_columns()?;
+    let columns_to_skip = using_columns
+        .into_iter()
+        // For each USING JOIN condition, only expand to one column in projection
+        .map(|cols| {
+            let mut cols = cols.into_iter().collect::<Vec<_>>();
+            // sort join columns to make sure we consistently keep the same
+            // qualified column
+            cols.sort();
+            cols.into_iter().skip(1)
+        })
+        .flatten()
+        .collect::<HashSet<_>>();
+
+    if columns_to_skip.is_empty() {
+        Ok(schema
+            .fields()
+            .iter()
+            .map(|f| Expr::Column(f.qualified_column()))
+            .collect::<Vec<Expr>>())
+    } else {
+        Ok(schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                let col = f.qualified_column();
+                if !columns_to_skip.contains(&col) {
+                    Some(Expr::Column(col))
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<Expr>>())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::{DataType, Field};
@@ -588,6 +629,27 @@ mod tests {
     }
 
     #[test]
+    fn plan_using_join_wildcard_projection() -> Result<()> {
+        let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)?
+            .join_using(&t2, JoinType::Inner, vec!["id"])?
+            .project(vec![Expr::Wildcard])?
+            .build()?;
+
+        // id column should only show up once in projection
+        let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\
+        \n  Join: Using #t1.id = #t2.id\
+        \n    TableScan: t1 projection=None\
+        \n    TableScan: t2 projection=None";
+
+        assert_eq!(expected, format!("{:?}", plan));
+
+        Ok(())
+    }
+
+    #[test]
     fn plan_builder_union_combined_single_union() -> Result<()> {
         let plan = LogicalPlanBuilder::scan_empty(
             Some("employee_csv"),
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 59c9979..2eee140 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -34,7 +34,7 @@ use std::fmt;
 use std::sync::Arc;
 
 /// A named reference to a qualified field in a schema.
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
 pub struct Column {
     /// relation/table name.
     pub relation: Option<String>,
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 2c751ab..f381e31 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -21,7 +21,7 @@
 //! Logical query plans can then be optimized and executed directly, or translated into
 //! physical query plans and executed.
 
-mod builder;
+pub(crate) mod builder;
 mod dfschema;
 mod display;
 mod expr;
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index b954b6a..2504dfa 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -25,8 +25,8 @@ use crate::error::DataFusionError;
 use crate::logical_plan::dfschema::DFSchemaRef;
 use crate::sql::parser::FileType;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use std::collections::HashSet;
 use std::{
+    collections::HashSet,
     fmt::{self, Display},
     sync::Arc,
 };
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index f89ba3f..41b4e20 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -27,8 +27,9 @@ use crate::datasource::TableProvider;
 use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
-    and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan,
-    LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema,
+    and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
+    DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan,
+    ToDFSchema,
 };
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
@@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError;
 use super::{
     parser::DFParser,
     utils::{
-        can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
+        can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
         find_aggregate_exprs, find_column_exprs, find_window_exprs,
         group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
         resolve_positions_to_exprs,
@@ -687,9 +688,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .iter()
             .map(|expr| self.sql_select_to_rex(expr, input_schema))
             .collect::<Result<Vec<Expr>>>()?
-            .iter()
-            .flat_map(|expr| expand_wildcard(expr, input_schema))
-            .map(|expr| normalize_col(expr, plan))
+            .into_iter()
+            .map(|expr| {
+                Ok(match expr {
+                    Expr::Wildcard => expand_wildcard(input_schema, plan)?,
+                    _ => vec![normalize_col(expr, plan)?],
+                })
+            })
+            .flat_map(|res| match res {
+                Ok(v) => v.into_iter().map(Ok).collect(),
+                Err(e) => vec![Err(e)],
+            })
             .collect::<Result<Vec<Expr>>>()
     }
 
@@ -2774,6 +2783,19 @@ mod tests {
     }
 
     #[test]
+    fn project_wildcard_on_join_with_using() {
+        let sql = "SELECT * \
+            FROM lineitem \
+            JOIN lineitem as lineitem2 \
+            USING (l_item_id)";
+        let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
+        \n  Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
+        \n    TableScan: lineitem projection=None\
+        \n    TableScan: lineitem2 projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
     fn equijoin_explicit_syntax_3_tables() {
         let sql = "SELECT id, order_id, l_description \
             FROM person \
diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs
index 2824336..41bcd20 100644
--- a/datafusion/src/sql/utils.rs
+++ b/datafusion/src/sql/utils.rs
@@ -17,7 +17,7 @@
 
 //! SQL Utility Functions
 
-use crate::logical_plan::{DFSchema, Expr, LogicalPlan};
+use crate::logical_plan::{Expr, LogicalPlan};
 use crate::scalar::ScalarValue;
 use crate::{
     error::{DataFusionError, Result},
@@ -25,18 +25,6 @@ use crate::{
 };
 use std::collections::HashMap;
 
-/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
-pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec<Expr> {
-    match expr {
-        Expr::Wildcard => schema
-            .fields()
-            .iter()
-            .map(|f| Expr::Column(f.qualified_column()))
-            .collect::<Vec<Expr>>(),
-        _ => vec![expr.clone()],
-    }
-}
-
 /// Collect all deeply nested `Expr::AggregateFunction` and
 /// `Expr::AggregateUDF`. They are returned in order of occurrence (depth
 /// first), with duplicates omitted.