You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/01 06:41:12 UTC

[arrow-datafusion] branch main updated: Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 25df88762e Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)
25df88762e is described below

commit 25df88762ef57330de9f0da4853fb4854c3e48d9
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 09:41:06 2023 +0300

    Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)
    
    * Add Exclude support in select
    
    * comment fix
    
    * Add new test, for showing different syntax during single column exclusion
    
    * Add support for EXCEPT
    
    * Update tests, to reflect better which columns table contains
    
    * Add duplicate column check to EXCEPT and EXCLUDE
    
    * Update test
    
    ---------
    
    Co-authored-by: berkaysynnada <be...@synnada.ai>
---
 .../core/tests/sqllogictests/test_files/select.slt |  86 ++++++++++++++
 datafusion/expr/src/logical_plan/builder.rs        |   9 +-
 datafusion/expr/src/utils.rs                       | 132 ++++++++++++++++-----
 .../optimizer/src/replace_distinct_aggregate.rs    |   2 +-
 datafusion/sql/src/select.rs                       |  29 +++--
 5 files changed, 209 insertions(+), 49 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index 399c212210..cc8828ef87 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -759,6 +759,92 @@ SELECT a FROM annotated_data_finite2
 0
 0
 
+# create a table to test SELECT * EXCLUDE, SELECT * EXCEPT syntax
+statement ok
+CREATE TABLE table1 (
+  a int,
+  b int,
+  c int,
+  d int
+) as values
+  (1, 10, 100, 1000),
+  (2, 20, 200, 2000);
+
+# Below query should emit all the columns except a and b
+# The syntax is as follows: `SELECT * EXCLUDE(<col_name>, ...)`
+# when only single column is excluded, we can either use
+# `EXCLUDE <col_name>` or `EXCLUDE(<col_name>)` syntax
+query II
+SELECT * EXCLUDE(b) FROM (
+  SELECT * EXCLUDE a
+    FROM table1
+    ORDER BY c
+    LIMIT 5
+  )
+----
+100 1000
+200 2000
+
+# Below query should emit all the columns except a and b
+# To exclude some columns, we can use except clause also,
+# the behavior is similar to EXCLUDE clause.
+# The syntax is as follows: `SELECT * EXCEPT(<col_name>, ...)`
+query II
+SELECT * EXCEPT(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# below query should emit all the columns except a and b
+query II
+SELECT * EXCLUDE(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# when wildcard is prepended with table name, exclude should still work
+# below query should emit all the columns except a and b
+query II
+SELECT table1.* EXCLUDE(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# Trying to exclude non-existing column should give error
+statement error DataFusion error: Schema error: No field named e. Valid fields are table1.a, table1.b, table1.c, table1.d.
+SELECT * EXCLUDE e
+FROM table1
+
+# similarly, except should raise error if excluded column is not in the table
+statement error DataFusion error: Schema error: No field named e. Valid fields are table1.a, table1.b, table1.c, table1.d.
+SELECT * EXCEPT(e)
+FROM table1
+
+# EXCEPT, or EXCLUDE can only be used after wildcard *
+# below query should give 4 columns, a1, b1, b, c, d
+query IIIII
+SELECT a as a1, b as b1, * EXCEPT(a)
+FROM table1
+----
+1 10 10 100 1000
+2 20 20 200 2000
+
+# EXCEPT, or EXCLUDE shouldn't contain duplicate column names
+statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT contains duplicate column names
+SELECT * EXCLUDE(a, a)
+FROM table1
 
 statement ok
 drop table annotated_data_finite2;
+
+statement ok
+drop table t;
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 741ed3a24b..606b990cfe 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1236,11 +1236,10 @@ pub fn project(
         let e = e.into();
         match e {
             Expr::Wildcard => {
-                projected_expr.extend(expand_wildcard(input_schema, &plan)?)
-            }
-            Expr::QualifiedWildcard { ref qualifier } => {
-                projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
+                projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
             }
+            Expr::QualifiedWildcard { ref qualifier } => projected_expr
+                .extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
             _ => projected_expr
                 .push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
         }
@@ -1328,7 +1327,7 @@ pub fn wrap_projection_for_join_if_necessary(
 
     let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
     let plan = if need_project {
-        let mut projection = expand_wildcard(input_schema, &input)?;
+        let mut projection = expand_wildcard(input_schema, &input, None)?;
         let join_key_items = alias_join_keys
             .iter()
             .flat_map(|expr| expr.try_into_col().is_err().then_some(expr))
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 1d7aa536f9..c2eabea857 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -37,6 +37,7 @@ use datafusion_common::{
     Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
     TableReference,
 };
+use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions};
 use std::cmp::Ordering;
 use std::collections::HashSet;
 use std::sync::Arc;
@@ -315,10 +316,84 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
     })
 }
 
+/// Find excluded columns in the schema, if any
+/// SELECT * EXCLUDE(col1, col2), would return `vec![col1, col2]`
+fn get_excluded_columns(
+    opt_exclude: Option<ExcludeSelectItem>,
+    opt_except: Option<ExceptSelectItem>,
+    schema: &DFSchema,
+    qualifier: &Option<TableReference>,
+) -> Result<Vec<Column>> {
+    let mut idents = vec![];
+    if let Some(excepts) = opt_except {
+        idents.push(excepts.first_element);
+        idents.extend(excepts.additional_elements);
+    }
+    if let Some(exclude) = opt_exclude {
+        match exclude {
+            ExcludeSelectItem::Single(ident) => idents.push(ident),
+            ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner),
+        }
+    }
+    // Excluded columns should be unique
+    let n_elem = idents.len();
+    let unique_idents = idents.into_iter().collect::<HashSet<_>>();
+    // if HashSet size, and vector length are different, this means that some of the excluded columns
+    // are not unique. In this case return error.
+    if n_elem != unique_idents.len() {
+        return Err(DataFusionError::Plan(
+            "EXCLUDE or EXCEPT contains duplicate column names".to_string(),
+        ));
+    }
+
+    let mut result = vec![];
+    for ident in unique_idents.into_iter() {
+        let col_name = ident.value.as_str();
+        let field = if let Some(qualifier) = qualifier {
+            schema.field_with_qualified_name(qualifier, col_name)?
+        } else {
+            schema.field_with_unqualified_name(col_name)?
+        };
+        result.push(field.qualified_column())
+    }
+    Ok(result)
+}
+
+/// Returns all `Expr`s in the schema, except the `Column`s in the `columns_to_skip`
+fn get_exprs_except_skipped(
+    schema: &DFSchema,
+    columns_to_skip: HashSet<Column>,
+) -> Vec<Expr> {
+    if columns_to_skip.is_empty() {
+        schema
+            .fields()
+            .iter()
+            .map(|f| Expr::Column(f.qualified_column()))
+            .collect::<Vec<Expr>>()
+    } else {
+        schema
+            .fields()
+            .iter()
+            .filter_map(|f| {
+                let col = f.qualified_column();
+                if !columns_to_skip.contains(&col) {
+                    Some(Expr::Column(col))
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<Expr>>()
+    }
+}
+
 /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
-pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr>> {
+pub fn expand_wildcard(
+    schema: &DFSchema,
+    plan: &LogicalPlan,
+    wildcard_options: Option<WildcardAdditionalOptions>,
+) -> Result<Vec<Expr>> {
     let using_columns = plan.using_columns()?;
-    let columns_to_skip = using_columns
+    let mut columns_to_skip = using_columns
         .into_iter()
         // For each USING JOIN condition, only expand to one of each join column in projection
         .flat_map(|cols| {
@@ -339,33 +414,26 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
                 .collect::<Vec<_>>()
         })
         .collect::<HashSet<_>>();
-
-    if columns_to_skip.is_empty() {
-        Ok(schema
-            .fields()
-            .iter()
-            .map(|f| Expr::Column(f.qualified_column()))
-            .collect::<Vec<Expr>>())
+    let excluded_columns = if let Some(WildcardAdditionalOptions {
+        opt_exclude,
+        opt_except,
+        ..
+    }) = wildcard_options
+    {
+        get_excluded_columns(opt_exclude, opt_except, schema, &None)?
     } else {
-        Ok(schema
-            .fields()
-            .iter()
-            .filter_map(|f| {
-                let col = f.qualified_column();
-                if !columns_to_skip.contains(&col) {
-                    Some(Expr::Column(col))
-                } else {
-                    None
-                }
-            })
-            .collect::<Vec<Expr>>())
-    }
+        vec![]
+    };
+    // Add each excluded `Column` to columns_to_skip
+    columns_to_skip.extend(excluded_columns);
+    Ok(get_exprs_except_skipped(schema, columns_to_skip))
 }
 
 /// Resolves an `Expr::Wildcard` to a collection of qualified `Expr::Column`'s.
 pub fn expand_qualified_wildcard(
     qualifier: &str,
     schema: &DFSchema,
+    wildcard_options: Option<WildcardAdditionalOptions>,
 ) -> Result<Vec<Expr>> {
     let qualifier = TableReference::from(qualifier);
     let qualified_fields: Vec<DFField> = schema
@@ -380,12 +448,20 @@ pub fn expand_qualified_wildcard(
     }
     let qualified_schema =
         DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
-    // if qualified, allow all columns in output (i.e. ignore using column check)
-    Ok(qualified_schema
-        .fields()
-        .iter()
-        .map(|f| Expr::Column(f.qualified_column()))
-        .collect::<Vec<Expr>>())
+    let excluded_columns = if let Some(WildcardAdditionalOptions {
+        opt_exclude,
+        opt_except,
+        ..
+    }) = wildcard_options
+    {
+        get_excluded_columns(opt_exclude, opt_except, schema, &Some(qualifier))?
+    } else {
+        vec![]
+    };
+    // Add each excluded `Column` to columns_to_skip
+    let mut columns_to_skip = HashSet::new();
+    columns_to_skip.extend(excluded_columns);
+    Ok(get_exprs_except_skipped(&qualified_schema, columns_to_skip))
 }
 
 /// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 2a604bc3ff..f58d4b1597 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -53,7 +53,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Distinct(Distinct { input }) => {
-                let group_expr = expand_wildcard(input.schema(), input)?;
+                let group_expr = expand_wildcard(input.schema(), input, None)?;
                 let aggregate = LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
                     input.clone(),
                     group_expr,
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 612a38d6f0..f12830df42 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -352,7 +352,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 Ok(vec![expr])
             }
             SelectItem::Wildcard(options) => {
-                Self::check_wildcard_options(options)?;
+                Self::check_wildcard_options(&options)?;
 
                 if empty_from {
                     return Err(DataFusionError::Plan(
@@ -360,34 +360,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     ));
                 }
                 // do not expand from outer schema
-                expand_wildcard(plan.schema().as_ref(), plan)
+                expand_wildcard(plan.schema().as_ref(), plan, Some(options))
             }
             SelectItem::QualifiedWildcard(ref object_name, options) => {
-                Self::check_wildcard_options(options)?;
-
+                Self::check_wildcard_options(&options)?;
                 let qualifier = format!("{object_name}");
                 // do not expand from outer schema
-                expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
+                expand_qualified_wildcard(
+                    &qualifier,
+                    plan.schema().as_ref(),
+                    Some(options),
+                )
             }
         }
     }
 
-    fn check_wildcard_options(options: WildcardAdditionalOptions) -> Result<()> {
+    fn check_wildcard_options(options: &WildcardAdditionalOptions) -> Result<()> {
         let WildcardAdditionalOptions {
-            opt_exclude,
-            opt_except,
+            // opt_exclude is handled
+            opt_exclude: _opt_exclude,
+            opt_except: _opt_except,
             opt_rename,
             opt_replace,
         } = options;
 
-        if opt_exclude.is_some()
-            || opt_except.is_some()
-            || opt_rename.is_some()
-            || opt_replace.is_some()
-        {
+        if opt_rename.is_some() || opt_replace.is_some() {
             Err(DataFusionError::NotImplemented(
-                "wildcard * with EXCLUDE, EXCEPT, RENAME or REPLACE not supported "
-                    .to_string(),
+                "wildcard * with RENAME or REPLACE not supported ".to_string(),
             ))
         } else {
             Ok(())