You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/01 06:41:12 UTC
[arrow-datafusion] branch main updated: Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 25df88762e Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)
25df88762e is described below
commit 25df88762ef57330de9f0da4853fb4854c3e48d9
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Jun 1 09:41:06 2023 +0300
Add SELECT * EXCLUDE, SELECT * EXCEPT support (#6481)
* Add Exclude support in select
* comment fix
* Add new test, for showing different syntax during single column exclusion
* Add support for EXCEPT
* Update tests, to reflect better which columns table contains
* Add duplicate column check to EXCEPT and EXCLUDE
* Update test
---------
Co-authored-by: berkaysynnada <be...@synnada.ai>
---
.../core/tests/sqllogictests/test_files/select.slt | 86 ++++++++++++++
datafusion/expr/src/logical_plan/builder.rs | 9 +-
datafusion/expr/src/utils.rs | 132 ++++++++++++++++-----
.../optimizer/src/replace_distinct_aggregate.rs | 2 +-
datafusion/sql/src/select.rs | 29 +++--
5 files changed, 209 insertions(+), 49 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index 399c212210..cc8828ef87 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -759,6 +759,92 @@ SELECT a FROM annotated_data_finite2
0
0
+# create a table to test SELECT * EXCLUDE, SELECT * EXCEPT syntax
+statement ok
+CREATE TABLE table1 (
+ a int,
+ b int,
+ c int,
+ d int
+) as values
+ (1, 10, 100, 1000),
+ (2, 20, 200, 2000);
+
+# Below query should emit all the columns except a and b
+# The syntax is as follows: `SELECT * EXCLUDE(<col_name>, ...)`
+# when only single column is excluded, we can either use
+# `EXCLUDE <col_name>` or `EXCLUDE(<col_name>)` syntax
+query II
+SELECT * EXCLUDE(b) FROM (
+ SELECT * EXCLUDE a
+ FROM table1
+ ORDER BY c
+ LIMIT 5
+ )
+----
+100 1000
+200 2000
+
+# Below query should emit all the columns except a and b
+# To exclude some columns, we can use except clause also,
+# the behavior is similar to EXCLUDE clause.
+# The syntax is as follows: `SELECT * EXCEPT(<col_name>, ...)`
+query II
+SELECT * EXCEPT(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# below query should emit all the columns except a and b
+query II
+SELECT * EXCLUDE(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# when wildcard is prepended with table name, exclude should still work
+# below query should emit all the columns except a and b
+query II
+SELECT table1.* EXCLUDE(a, b)
+FROM table1
+ORDER BY c
+LIMIT 5
+----
+100 1000
+200 2000
+
+# Trying to exclude non-existing column should give error
+statement error DataFusion error: Schema error: No field named e. Valid fields are table1.a, table1.b, table1.c, table1.d.
+SELECT * EXCLUDE e
+FROM table1
+
+# similarly, except should raise error if excluded column is not in the table
+statement error DataFusion error: Schema error: No field named e. Valid fields are table1.a, table1.b, table1.c, table1.d.
+SELECT * EXCEPT(e)
+FROM table1
+
+# EXCEPT, or EXCLUDE can only be used after wildcard *
+# below query should give 4 columns, a1, b1, b, c, d
+query IIIII
+SELECT a as a1, b as b1, * EXCEPT(a)
+FROM table1
+----
+1 10 10 100 1000
+2 20 20 200 2000
+
+# EXCEPT, or EXCLUDE shouldn't contain duplicate column names
+statement error DataFusion error: Error during planning: EXCLUDE or EXCEPT contains duplicate column names
+SELECT * EXCLUDE(a, a)
+FROM table1
statement ok
drop table annotated_data_finite2;
+
+statement ok
+drop table t;
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 741ed3a24b..606b990cfe 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1236,11 +1236,10 @@ pub fn project(
let e = e.into();
match e {
Expr::Wildcard => {
- projected_expr.extend(expand_wildcard(input_schema, &plan)?)
- }
- Expr::QualifiedWildcard { ref qualifier } => {
- projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
+ projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
}
+ Expr::QualifiedWildcard { ref qualifier } => projected_expr
+ .extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
@@ -1328,7 +1327,7 @@ pub fn wrap_projection_for_join_if_necessary(
let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
let plan = if need_project {
- let mut projection = expand_wildcard(input_schema, &input)?;
+ let mut projection = expand_wildcard(input_schema, &input, None)?;
let join_key_items = alias_join_keys
.iter()
.flat_map(|expr| expr.try_into_col().is_err().then_some(expr))
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 1d7aa536f9..c2eabea857 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -37,6 +37,7 @@ use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
TableReference,
};
+use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions};
use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Arc;
@@ -315,10 +316,84 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
})
}
+/// Find excluded columns in the schema, if any
+/// SELECT * EXCLUDE(col1, col2), would return `vec![col1, col2]`
+fn get_excluded_columns(
+ opt_exclude: Option<ExcludeSelectItem>,
+ opt_except: Option<ExceptSelectItem>,
+ schema: &DFSchema,
+ qualifier: &Option<TableReference>,
+) -> Result<Vec<Column>> {
+ let mut idents = vec![];
+ if let Some(excepts) = opt_except {
+ idents.push(excepts.first_element);
+ idents.extend(excepts.additional_elements);
+ }
+ if let Some(exclude) = opt_exclude {
+ match exclude {
+ ExcludeSelectItem::Single(ident) => idents.push(ident),
+ ExcludeSelectItem::Multiple(idents_inner) => idents.extend(idents_inner),
+ }
+ }
+ // Excluded columns should be unique
+ let n_elem = idents.len();
+ let unique_idents = idents.into_iter().collect::<HashSet<_>>();
+ // if HashSet size, and vector length are different, this means that some of the excluded columns
+ // are not unique. In this case return error.
+ if n_elem != unique_idents.len() {
+ return Err(DataFusionError::Plan(
+ "EXCLUDE or EXCEPT contains duplicate column names".to_string(),
+ ));
+ }
+
+ let mut result = vec![];
+ for ident in unique_idents.into_iter() {
+ let col_name = ident.value.as_str();
+ let field = if let Some(qualifier) = qualifier {
+ schema.field_with_qualified_name(qualifier, col_name)?
+ } else {
+ schema.field_with_unqualified_name(col_name)?
+ };
+ result.push(field.qualified_column())
+ }
+ Ok(result)
+}
+
+/// Returns all `Expr`s in the schema, except the `Column`s in the `columns_to_skip`
+fn get_exprs_except_skipped(
+ schema: &DFSchema,
+ columns_to_skip: HashSet<Column>,
+) -> Vec<Expr> {
+ if columns_to_skip.is_empty() {
+ schema
+ .fields()
+ .iter()
+ .map(|f| Expr::Column(f.qualified_column()))
+ .collect::<Vec<Expr>>()
+ } else {
+ schema
+ .fields()
+ .iter()
+ .filter_map(|f| {
+ let col = f.qualified_column();
+ if !columns_to_skip.contains(&col) {
+ Some(Expr::Column(col))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<Expr>>()
+ }
+}
+
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
-pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr>> {
+pub fn expand_wildcard(
+ schema: &DFSchema,
+ plan: &LogicalPlan,
+ wildcard_options: Option<WildcardAdditionalOptions>,
+) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
- let columns_to_skip = using_columns
+ let mut columns_to_skip = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one of each join column in projection
.flat_map(|cols| {
@@ -339,33 +414,26 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
.collect::<Vec<_>>()
})
.collect::<HashSet<_>>();
-
- if columns_to_skip.is_empty() {
- Ok(schema
- .fields()
- .iter()
- .map(|f| Expr::Column(f.qualified_column()))
- .collect::<Vec<Expr>>())
+ let excluded_columns = if let Some(WildcardAdditionalOptions {
+ opt_exclude,
+ opt_except,
+ ..
+ }) = wildcard_options
+ {
+ get_excluded_columns(opt_exclude, opt_except, schema, &None)?
} else {
- Ok(schema
- .fields()
- .iter()
- .filter_map(|f| {
- let col = f.qualified_column();
- if !columns_to_skip.contains(&col) {
- Some(Expr::Column(col))
- } else {
- None
- }
- })
- .collect::<Vec<Expr>>())
- }
+ vec![]
+ };
+ // Add each excluded `Column` to columns_to_skip
+ columns_to_skip.extend(excluded_columns);
+ Ok(get_exprs_except_skipped(schema, columns_to_skip))
}
/// Resolves an `Expr::Wildcard` to a collection of qualified `Expr::Column`'s.
pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
+ wildcard_options: Option<WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
let qualifier = TableReference::from(qualifier);
let qualified_fields: Vec<DFField> = schema
@@ -380,12 +448,20 @@ pub fn expand_qualified_wildcard(
}
let qualified_schema =
DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
- // if qualified, allow all columns in output (i.e. ignore using column check)
- Ok(qualified_schema
- .fields()
- .iter()
- .map(|f| Expr::Column(f.qualified_column()))
- .collect::<Vec<Expr>>())
+ let excluded_columns = if let Some(WildcardAdditionalOptions {
+ opt_exclude,
+ opt_except,
+ ..
+ }) = wildcard_options
+ {
+ get_excluded_columns(opt_exclude, opt_except, schema, &Some(qualifier))?
+ } else {
+ vec![]
+ };
+ // Add each excluded `Column` to columns_to_skip
+ let mut columns_to_skip = HashSet::new();
+ columns_to_skip.extend(excluded_columns);
+ Ok(get_exprs_except_skipped(&qualified_schema, columns_to_skip))
}
/// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 2a604bc3ff..f58d4b1597 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -53,7 +53,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct { input }) => {
- let group_expr = expand_wildcard(input.schema(), input)?;
+ let group_expr = expand_wildcard(input.schema(), input, None)?;
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
input.clone(),
group_expr,
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 612a38d6f0..f12830df42 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -352,7 +352,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(vec![expr])
}
SelectItem::Wildcard(options) => {
- Self::check_wildcard_options(options)?;
+ Self::check_wildcard_options(&options)?;
if empty_from {
return Err(DataFusionError::Plan(
@@ -360,34 +360,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
));
}
// do not expand from outer schema
- expand_wildcard(plan.schema().as_ref(), plan)
+ expand_wildcard(plan.schema().as_ref(), plan, Some(options))
}
SelectItem::QualifiedWildcard(ref object_name, options) => {
- Self::check_wildcard_options(options)?;
-
+ Self::check_wildcard_options(&options)?;
let qualifier = format!("{object_name}");
// do not expand from outer schema
- expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
+ expand_qualified_wildcard(
+ &qualifier,
+ plan.schema().as_ref(),
+ Some(options),
+ )
}
}
}
- fn check_wildcard_options(options: WildcardAdditionalOptions) -> Result<()> {
+ fn check_wildcard_options(options: &WildcardAdditionalOptions) -> Result<()> {
let WildcardAdditionalOptions {
- opt_exclude,
- opt_except,
+ // opt_exclude is handled
+ opt_exclude: _opt_exclude,
+ opt_except: _opt_except,
opt_rename,
opt_replace,
} = options;
- if opt_exclude.is_some()
- || opt_except.is_some()
- || opt_rename.is_some()
- || opt_replace.is_some()
- {
+ if opt_rename.is_some() || opt_replace.is_some() {
Err(DataFusionError::NotImplemented(
- "wildcard * with EXCLUDE, EXCEPT, RENAME or REPLACE not supported "
- .to_string(),
+ "wildcard * with RENAME or REPLACE not supported ".to_string(),
))
} else {
Ok(())