You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/07/27 11:28:48 UTC
[arrow-datafusion] branch main updated: Initial support for functional dependencies handling primary key and unique constraints (#7040)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5f03146544 Initial support for functional dependencies handling primary key and unique constraints (#7040)
5f03146544 is described below
commit 5f031465447e60d482d5236e2795f7975bbfe45d
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Jul 27 14:28:43 2023 +0300
Initial support for functional dependencies handling primary key and unique constraints (#7040)
* Initial commit
* Add primary key to DFSchema
* store primary key in metadata during schema
* all tests pass
* simplifications
* Move test to the .slt file
* simplifications
* Simplifications
* primary key with associated expression indices
* boilerplate for primary key propagation between executors
* Add new tests
* Add Projection primary key handling
* Keep primary key as vec
* Move hash map to struct
* simplifications
* Update comments
* Update datafusion/core/src/physical_planner.rs
Co-authored-by: Metehan Yıldırım <10...@users.noreply.github.com>
* Remove unnecessary code
* Rename, update comments
* Simplifications
* Minor changes
* minor changes
* Code style + improved comments
* Add new tests, address TODO, Add comments
* Final review
* Address reviews, change design to encapsulate functional dependency
* Rename variables
* Change primary key API
* Fix test
* Functional dependency review
* Address TODO, fix failing tests
* Fix TODO.
* Address reviews
* Implement Constraints struct
* Convert some pub functions to private
* Minor changes
* Minor changes
---------
Co-authored-by: Metehan Yıldırım <10...@users.noreply.github.com>
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/common/src/dfschema.rs | 29 +-
datafusion/common/src/functional_dependencies.rs | 520 +++++++++++++++++++++
datafusion/common/src/lib.rs | 5 +
.../core/src/datasource/default_table_source.rs | 13 +-
datafusion/core/src/datasource/memory.rs | 16 +-
datafusion/core/src/datasource/provider.rs | 7 +-
datafusion/core/src/execution/context.rs | 17 +-
datafusion/core/src/physical_planner.rs | 3 +-
.../tests/sqllogictests/test_files/groupby.slt | 387 +++++++++++++++
datafusion/expr/src/logical_plan/builder.rs | 68 ++-
datafusion/expr/src/logical_plan/ddl.rs | 23 +-
datafusion/expr/src/logical_plan/plan.rs | 115 ++++-
datafusion/expr/src/table_source.rs | 9 +-
datafusion/expr/src/utils.rs | 21 +-
.../optimizer/src/analyzer/count_wildcard_rule.rs | 6 +-
datafusion/optimizer/src/optimizer.rs | 4 +-
.../optimizer/src/single_distinct_to_groupby.rs | 11 +-
datafusion/sql/src/query.rs | 4 +-
datafusion/sql/src/select.rs | 53 ++-
datafusion/sql/src/statement.rs | 82 +---
datafusion/sql/tests/sql_integration.rs | 5 +-
21 files changed, 1248 insertions(+), 150 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs
index 8d9736eb64..64937eafa1 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -20,15 +20,17 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
+use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
use crate::error::{unqualified_field_not_found, DataFusionError, Result, SchemaError};
-use crate::{field_not_found, Column, OwnedTableReference, TableReference};
+use crate::{
+ field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
+};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
/// A reference-counted reference to a `DFSchema`.
pub type DFSchemaRef = Arc<DFSchema>;
@@ -40,6 +42,8 @@ pub struct DFSchema {
fields: Vec<DFField>,
/// Additional metadata in form of key value pairs
metadata: HashMap<String, String>,
+ /// Stores functional dependencies in the schema.
+ functional_dependencies: FunctionalDependencies,
}
impl DFSchema {
@@ -48,6 +52,7 @@ impl DFSchema {
Self {
fields: vec![],
metadata: HashMap::new(),
+ functional_dependencies: FunctionalDependencies::empty(),
}
}
@@ -97,7 +102,11 @@ impl DFSchema {
));
}
}
- Ok(Self { fields, metadata })
+ Ok(Self {
+ fields,
+ metadata,
+ functional_dependencies: FunctionalDependencies::empty(),
+ })
}
/// Create a `DFSchema` from an Arrow schema and a given qualifier
@@ -116,6 +125,15 @@ impl DFSchema {
)
}
+ /// Assigns functional dependencies.
+ pub fn with_functional_dependencies(
+ mut self,
+ functional_dependencies: FunctionalDependencies,
+ ) -> Self {
+ self.functional_dependencies = functional_dependencies;
+ self
+ }
+
/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
@@ -471,6 +489,11 @@ impl DFSchema {
pub fn metadata(&self) -> &HashMap<String, String> {
&self.metadata
}
+
+ /// Get functional dependencies
+ pub fn functional_dependencies(&self) -> &FunctionalDependencies {
+ &self.functional_dependencies
+ }
}
impl From<DFSchema> for Schema {
diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs
new file mode 100644
index 0000000000..b7a8e768ec
--- /dev/null
+++ b/datafusion/common/src/functional_dependencies.rs
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! FunctionalDependencies keeps track of functional dependencies
+//! inside DFSchema.
+
+use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
+use sqlparser::ast::TableConstraint;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+
+/// This object defines a constraint on a table.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+enum Constraint {
+ /// Columns with the given indices form a composite primary key (they are
+ /// jointly unique and not nullable):
+ PrimaryKey(Vec<usize>),
+ /// Columns with the given indices form a composite unique key:
+ Unique(Vec<usize>),
+}
+
+/// This object encapsulates a list of functional constraints:
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Constraints {
+ inner: Vec<Constraint>,
+}
+
+impl Constraints {
+ /// Create empty constraints
+ pub fn empty() -> Self {
+ Constraints::new(vec![])
+ }
+
+ // This method is private.
+ // Outside callers can either create empty constraint using `Constraints::empty` API.
+ // or create constraint from table constraints using `Constraints::new_from_table_constraints` API.
+ fn new(constraints: Vec<Constraint>) -> Self {
+ Self { inner: constraints }
+ }
+
+ /// Convert each `TableConstraint` to corresponding `Constraint`
+ pub fn new_from_table_constraints(
+ constraints: &[TableConstraint],
+ df_schema: &DFSchemaRef,
+ ) -> Result<Self> {
+ let constraints = constraints
+ .iter()
+ .map(|c: &TableConstraint| match c {
+ TableConstraint::Unique {
+ columns,
+ is_primary,
+ ..
+ } => {
+ // Get primary key and/or unique indices in the schema:
+ let indices = columns
+ .iter()
+ .map(|pk| {
+ let idx = df_schema
+ .fields()
+ .iter()
+ .position(|item| {
+ item.qualified_name() == pk.value.clone()
+ })
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "Primary key doesn't exist".to_string(),
+ )
+ })?;
+ Ok(idx)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(if *is_primary {
+ Constraint::PrimaryKey(indices)
+ } else {
+ Constraint::Unique(indices)
+ })
+ }
+ TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
+ "Foreign key constraints are not currently supported".to_string(),
+ )),
+ TableConstraint::Check { .. } => Err(DataFusionError::Plan(
+ "Check constraints are not currently supported".to_string(),
+ )),
+ TableConstraint::Index { .. } => Err(DataFusionError::Plan(
+ "Indexes are not currently supported".to_string(),
+ )),
+ TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
+ "Indexes are not currently supported".to_string(),
+ )),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Constraints::new(constraints))
+ }
+
+ /// Check whether constraints is empty
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+}
+
+impl Display for Constraints {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let pk: Vec<String> = self.inner.iter().map(|c| format!("{:?}", c)).collect();
+ let pk = pk.join(", ");
+ if !pk.is_empty() {
+ write!(f, " constraints=[{pk}]")
+ } else {
+ write!(f, "")
+ }
+ }
+}
+
+/// This object defines a functional dependence in the schema. A functional
+/// dependence defines a relationship between determinant keys and dependent
+/// columns. A determinant key is a column, or a set of columns, whose value
+/// uniquely determines values of some other (dependent) columns. If two rows
+/// have the same determinant key, dependent columns in these rows are
+/// necessarily the same. If the determinant key is unique, the set of
+/// dependent columns is equal to the entire schema and the determinant key can
+/// serve as a primary key. Note that a primary key may "downgrade" into a
+/// determinant key due to an operation such as a join, and this object is
+/// used to track dependence relationships in such cases. For more information
+/// on functional dependencies, see:
+/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct FunctionalDependence {
+ // Column indices of the (possibly composite) determinant key:
+ pub source_indices: Vec<usize>,
+ // Column indices of dependent column(s):
+ pub target_indices: Vec<usize>,
+ /// Flag indicating whether one of the `source_indices` can receive NULL values.
+ /// For a data source, if the constraint in question is `Constraint::Unique`,
+ /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
+ /// this flag is `false`.
+ /// Note that as the schema changes between different stages in a plan,
+ /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
+ /// change.
+ pub nullable: bool,
+ // The functional dependency mode:
+ pub mode: Dependency,
+}
+
+/// Describes functional dependency mode.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Dependency {
+ Single, // A determinant key may occur only once.
+ Multi, // A determinant key may occur multiple times (in multiple rows).
+}
+
+impl FunctionalDependence {
+ // Creates a new functional dependence.
+ pub fn new(
+ source_indices: Vec<usize>,
+ target_indices: Vec<usize>,
+ nullable: bool,
+ ) -> Self {
+ Self {
+ source_indices,
+ target_indices,
+ nullable,
+ // Start with the least restrictive mode by default:
+ mode: Dependency::Multi,
+ }
+ }
+
+ pub fn with_mode(mut self, mode: Dependency) -> Self {
+ self.mode = mode;
+ self
+ }
+}
+
+/// This object encapsulates all functional dependencies in a given relation.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct FunctionalDependencies {
+ deps: Vec<FunctionalDependence>,
+}
+
+impl FunctionalDependencies {
+ /// Creates an empty `FunctionalDependencies` object.
+ pub fn empty() -> Self {
+ Self { deps: vec![] }
+ }
+
+ /// Creates a new `FunctionalDependencies` object from a vector of
+ /// `FunctionalDependence` objects.
+ pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
+ Self { deps: dependencies }
+ }
+
+ /// Creates a new `FunctionalDependencies` object from the given constraints.
+ pub fn new_from_constraints(
+ constraints: Option<&Constraints>,
+ n_field: usize,
+ ) -> Self {
+ if let Some(Constraints { inner: constraints }) = constraints {
+ // Construct dependency objects based on each individual constraint:
+ let dependencies = constraints
+ .iter()
+ .map(|constraint| {
+ // All the field indices are associated with the whole table
+ // since we are dealing with table level constraints:
+ let dependency = match constraint {
+ Constraint::PrimaryKey(indices) => FunctionalDependence::new(
+ indices.to_vec(),
+ (0..n_field).collect::<Vec<_>>(),
+ false,
+ ),
+ Constraint::Unique(indices) => FunctionalDependence::new(
+ indices.to_vec(),
+ (0..n_field).collect::<Vec<_>>(),
+ true,
+ ),
+ };
+ // As primary keys are guaranteed to be unique, set the
+ // functional dependency mode to `Dependency::Single`:
+ dependency.with_mode(Dependency::Single)
+ })
+ .collect::<Vec<_>>();
+ Self::new(dependencies)
+ } else {
+ // There is no constraint, return an empty object:
+ Self::empty()
+ }
+ }
+
+ pub fn with_dependency(mut self, mode: Dependency) -> Self {
+ self.deps.iter_mut().for_each(|item| item.mode = mode);
+ self
+ }
+
+ /// Merges the given functional dependencies with these.
+ pub fn extend(&mut self, other: FunctionalDependencies) {
+ self.deps.extend(other.deps);
+ }
+
+ /// Adds the `offset` value to `source_indices` and `target_indices` for
+ /// each functional dependency.
+ pub fn add_offset(&mut self, offset: usize) {
+ self.deps.iter_mut().for_each(
+ |FunctionalDependence {
+ source_indices,
+ target_indices,
+ ..
+ }| {
+ *source_indices = add_offset_to_vec(source_indices, offset);
+ *target_indices = add_offset_to_vec(target_indices, offset);
+ },
+ )
+ }
+
+ /// Updates `source_indices` and `target_indices` of each functional
+ /// dependence using the index mapping given in `proj_indices`.
+ ///
+ /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
+ /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
+ /// In the updated schema, fields at indices \[2, 5, 8\] will transform
+ /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
+ /// be \[1\] -> \[1, 2\].
+ pub fn project_functional_dependencies(
+ &self,
+ proj_indices: &[usize],
+ // The argument `n_out` denotes the schema field length, which is needed
+ // to correctly associate a `Single`-mode dependence with the whole table.
+ n_out: usize,
+ ) -> FunctionalDependencies {
+ let mut projected_func_dependencies = vec![];
+ for FunctionalDependence {
+ source_indices,
+ target_indices,
+ nullable,
+ mode,
+ } in &self.deps
+ {
+ let new_source_indices =
+ update_elements_with_matching_indices(source_indices, proj_indices);
+ let new_target_indices = if *mode == Dependency::Single {
+ // Associate with all of the fields in the schema:
+ (0..n_out).collect()
+ } else {
+ // Update associations according to projection:
+ update_elements_with_matching_indices(target_indices, proj_indices)
+ };
+ // All of the composite indices should still be valid after projection;
+ // otherwise, functional dependency cannot be propagated.
+ if new_source_indices.len() == source_indices.len() {
+ let new_func_dependence = FunctionalDependence::new(
+ new_source_indices,
+ new_target_indices,
+ *nullable,
+ )
+ .with_mode(*mode);
+ projected_func_dependencies.push(new_func_dependence);
+ }
+ }
+ FunctionalDependencies::new(projected_func_dependencies)
+ }
+
+ /// This function joins this set of functional dependencies with the `other`
+ /// according to the given `join_type`.
+ pub fn join(
+ &self,
+ other: &FunctionalDependencies,
+ join_type: &JoinType,
+ left_cols_len: usize,
+ ) -> FunctionalDependencies {
+ // Get mutable copies of left and right side dependencies:
+ let mut right_func_dependencies = other.clone();
+ let mut left_func_dependencies = self.clone();
+
+ match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::Right => {
+ // Add offset to right schema:
+ right_func_dependencies.add_offset(left_cols_len);
+
+ // Result may have multiple values, update the dependency mode:
+ left_func_dependencies =
+ left_func_dependencies.with_dependency(Dependency::Multi);
+ right_func_dependencies =
+ right_func_dependencies.with_dependency(Dependency::Multi);
+
+ if *join_type == JoinType::Left {
+ // Downgrade the right side, since it may have additional NULL values:
+ right_func_dependencies.downgrade_dependencies();
+ } else if *join_type == JoinType::Right {
+ // Downgrade the left side, since it may have additional NULL values:
+ left_func_dependencies.downgrade_dependencies();
+ }
+ // Combine left and right functional dependencies:
+ left_func_dependencies.extend(right_func_dependencies);
+ left_func_dependencies
+ }
+ JoinType::LeftSemi | JoinType::LeftAnti => {
+ // These joins preserve functional dependencies of the left side:
+ left_func_dependencies
+ }
+ JoinType::RightSemi | JoinType::RightAnti => {
+ // These joins preserve functional dependencies of the right side:
+ right_func_dependencies
+ }
+ JoinType::Full => {
+ // All of the functional dependencies are lost in a FULL join:
+ FunctionalDependencies::empty()
+ }
+ }
+ }
+
+ /// This function downgrades a functional dependency when nullability becomes
+ /// a possibility:
+ /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
+ /// invalidates the dependency.
+ /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
+ /// null value turns it into UNIQUE mode.
+ fn downgrade_dependencies(&mut self) {
+ // Delete nullable dependencies, since they are no longer valid:
+ self.deps.retain(|item| !item.nullable);
+ self.deps.iter_mut().for_each(|item| item.nullable = true);
+ }
+
+ /// This function ensures that functional dependencies involving uniquely
+ /// occuring determinant keys cover their entire table in terms of
+ /// dependent columns.
+ pub fn extend_target_indices(&mut self, n_out: usize) {
+ self.deps.iter_mut().for_each(
+ |FunctionalDependence {
+ mode,
+ target_indices,
+ ..
+ }| {
+ // If unique, cover the whole table:
+ if *mode == Dependency::Single {
+ *target_indices = (0..n_out).collect::<Vec<_>>();
+ }
+ },
+ )
+ }
+}
+
+/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
+pub fn aggregate_functional_dependencies(
+ aggr_input_schema: &DFSchema,
+ group_by_expr_names: &[String],
+ aggr_schema: &DFSchema,
+) -> FunctionalDependencies {
+ let mut aggregate_func_dependencies = vec![];
+ let aggr_input_fields = aggr_input_schema.fields();
+ let aggr_fields = aggr_schema.fields();
+ // Association covers the whole table:
+ let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
+ // Get functional dependencies of the schema:
+ let func_dependencies = aggr_input_schema.functional_dependencies();
+ for FunctionalDependence {
+ source_indices,
+ nullable,
+ mode,
+ ..
+ } in &func_dependencies.deps
+ {
+ // Keep source indices in a `HashSet` to prevent duplicate entries:
+ let mut new_source_indices = HashSet::new();
+ let source_field_names = source_indices
+ .iter()
+ .map(|&idx| aggr_input_fields[idx].qualified_name())
+ .collect::<Vec<_>>();
+ for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
+ // When one of the input determinant expressions matches with
+ // the GROUP BY expression, add the index of the GROUP BY
+ // expression as a new determinant key:
+ if source_field_names.contains(group_by_expr_name) {
+ new_source_indices.insert(idx);
+ }
+ }
+ // All of the composite indices occur in the GROUP BY expression:
+ if new_source_indices.len() == source_indices.len() {
+ aggregate_func_dependencies.push(
+ FunctionalDependence::new(
+ new_source_indices.into_iter().collect(),
+ target_indices.clone(),
+ *nullable,
+ )
+ // input uniqueness stays the same when GROUP BY matches with input functional dependence determinants
+ .with_mode(*mode),
+ );
+ }
+ }
+ // If we have a single GROUP BY key, we can guarantee uniqueness after
+ // aggregation:
+ if group_by_expr_names.len() == 1 {
+ // If `source_indices` contain 0, delete this functional dependency
+ // as it will be added anyway with mode `Dependency::Single`:
+ if let Some(idx) = aggregate_func_dependencies
+ .iter()
+ .position(|item| item.source_indices.contains(&0))
+ {
+ // Delete the functional dependency that contains zeroth idx:
+ aggregate_func_dependencies.remove(idx);
+ }
+ // Add a new functional dependency associated with the whole table:
+ aggregate_func_dependencies.push(
+ // Use nullable property of the group by expression
+ FunctionalDependence::new(
+ vec![0],
+ target_indices,
+ aggr_fields[0].is_nullable(),
+ )
+ .with_mode(Dependency::Single),
+ );
+ }
+ FunctionalDependencies::new(aggregate_func_dependencies)
+}
+
+/// Returns target indices, for the determinant keys that are inside
+/// group by expressions.
+pub fn get_target_functional_dependencies(
+ schema: &DFSchema,
+ group_by_expr_names: &[String],
+) -> Option<Vec<usize>> {
+ let mut combined_target_indices = HashSet::new();
+ let dependencies = schema.functional_dependencies();
+ let field_names = schema
+ .fields()
+ .iter()
+ .map(|item| item.qualified_name())
+ .collect::<Vec<_>>();
+ for FunctionalDependence {
+ source_indices,
+ target_indices,
+ ..
+ } in &dependencies.deps
+ {
+ let source_key_names = source_indices
+ .iter()
+ .map(|id_key_idx| field_names[*id_key_idx].clone())
+ .collect::<Vec<_>>();
+ // If the GROUP BY expression contains a determinant key, we can use
+ // the associated fields after aggregation even if they are not part
+ // of the GROUP BY expression.
+ if source_key_names
+ .iter()
+ .all(|source_key_name| group_by_expr_names.contains(source_key_name))
+ {
+ combined_target_indices.extend(target_indices.iter());
+ }
+ }
+ (!combined_target_indices.is_empty())
+ .then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
+}
+
+/// Updates entries inside the `entries` vector with their corresponding
+/// indices inside the `proj_indices` vector.
+fn update_elements_with_matching_indices(
+ entries: &[usize],
+ proj_indices: &[usize],
+) -> Vec<usize> {
+ entries
+ .iter()
+ .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
+ .collect()
+}
+
+/// Adds `offset` value to each entry inside `in_data`.
+fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
+ in_data: &[T],
+ offset: T,
+) -> Vec<T> {
+ in_data.iter().map(|&item| item + offset).collect()
+}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 63b4024579..7a46f28b50 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -23,6 +23,7 @@ pub mod delta;
mod dfschema;
pub mod display;
mod error;
+mod functional_dependencies;
mod join_type;
pub mod parsers;
#[cfg(feature = "pyarrow")]
@@ -41,6 +42,10 @@ pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
SharedResult,
};
+pub use functional_dependencies::{
+ aggregate_functional_dependencies, get_target_functional_dependencies, Constraints,
+ Dependency, FunctionalDependence, FunctionalDependencies,
+};
pub use join_type::{JoinConstraint, JoinType};
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs
index c6fd87e7f1..bd3e832804 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -17,12 +17,14 @@
//! Default TableSource implementation used in DataFusion physical plans
+use std::any::Any;
+use std::sync::Arc;
+
use crate::datasource::TableProvider;
+
use arrow::datatypes::SchemaRef;
-use datafusion_common::DataFusionError;
+use datafusion_common::{Constraints, DataFusionError};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};
-use std::any::Any;
-use std::sync::Arc;
/// DataFusion default table source, wrapping TableProvider
///
@@ -52,6 +54,11 @@ impl TableSource for DefaultTableSource {
self.table_provider.schema()
}
+ /// Get a reference to applicable constraints, if any exists.
+ fn constraints(&self) -> Option<&Constraints> {
+ self.table_provider.constraints()
+ }
+
/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
fn supports_filters_pushdown(
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 967eb988eb..4b6653c688 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion_common::SchemaExt;
+use datafusion_common::{Constraints, SchemaExt};
use datafusion_execution::TaskContext;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
@@ -52,6 +52,7 @@ pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
pub struct MemTable {
schema: SchemaRef,
pub(crate) batches: Vec<PartitionData>,
+ constraints: Option<Constraints>,
}
impl MemTable {
@@ -76,9 +77,18 @@ impl MemTable {
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
+ constraints: None,
})
}
+ /// Assign constraints
+ pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ if !constraints.is_empty() {
+ self.constraints = Some(constraints);
+ }
+ self
+ }
+
/// Create a mem table by reading from another data source
pub async fn load(
t: Arc<dyn TableProvider>,
@@ -153,6 +163,10 @@ impl TableProvider for MemTable {
self.schema.clone()
}
+ fn constraints(&self) -> Option<&Constraints> {
+ self.constraints.as_ref()
+ }
+
fn table_type(&self) -> TableType {
TableType::Base
}
diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs
index 11f30f33d1..9c97935105 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
-use datafusion_common::{DataFusionError, Statistics};
+use datafusion_common::{Constraints, DataFusionError, Statistics};
use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
@@ -41,6 +41,11 @@ pub trait TableProvider: Sync + Send {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
+ /// Get a reference to the constraints of the table.
+ fn constraints(&self) -> Option<&Constraints> {
+ None
+ }
+
/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType;
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 811c2ec765..9a518940e2 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -470,19 +470,12 @@ impl SessionContext {
input,
if_not_exists,
or_replace,
- primary_key,
+ constraints,
} = cmd;
- if !primary_key.is_empty() {
- Err(DataFusionError::Execution(
- "Primary keys on MemoryTables are not currently supported!".to_string(),
- ))?;
- }
-
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let input = self.state().optimize(&input)?;
let table = self.table(&name).await;
-
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
@@ -500,11 +493,15 @@ impl SessionContext {
"'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
)),
(_, _, Err(_)) => {
- let schema = Arc::new(input.schema().as_ref().into());
+ let df_schema = input.schema();
+ let schema = Arc::new(df_schema.as_ref().into());
let physical = DataFrame::new(self.state(), input);
let batches: Vec<_> = physical.collect_partitioned().await?;
- let table = Arc::new(MemTable::try_new(schema, batches)?);
+ let table = Arc::new(
+ // pass constraints to the mem table.
+ MemTable::try_new(schema, batches)?.with_constraints(constraints),
+ );
self.register_table(&name, table)?;
self.return_empty_dataframe()
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 913deebddf..176f3cbe50 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -2186,7 +2186,7 @@ mod tests {
dict_id: 0, \
dict_is_ordered: false, \
metadata: {} } }\
- ], metadata: {} }, \
+ ], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
ExecutionPlan schema: Schema { fields: [\
Field { \
name: \"b\", \
@@ -2410,6 +2410,7 @@ mod tests {
);
}
}
+
struct ErrorExtensionPlanner {}
#[async_trait]
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index de57956f0e..b2677679c8 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2961,3 +2961,390 @@ ORDER BY s.sn
1 FRA 3 2022-01-02T12:00:00 EUR 200
0 GRC 4 2022-01-03T10:00:00 EUR 80
1 TUR 4 2022-01-03T10:00:00 TRY 100
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global_with_pk (zip_code INT,
+ country VARCHAR(3),
+ sn INT,
+ ts TIMESTAMP,
+ currency VARCHAR(3),
+ amount FLOAT,
+ primary key(sn)
+ ) as VALUES
+ (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+ (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+ (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+ (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+ (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# create a table for testing, where primary key is composite
+statement ok
+CREATE TABLE sales_global_with_composite_pk (zip_code INT,
+ country VARCHAR(3),
+ sn INT,
+ ts TIMESTAMP,
+ currency VARCHAR(3),
+ amount FLOAT,
+ primary key(sn, ts)
+ ) as VALUES
+ (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+ (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+ (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+ (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+ (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# create a table for testing, where sn is unique key
+statement ok
+CREATE TABLE sales_global_with_unique (zip_code INT,
+ country VARCHAR(3),
+ sn INT,
+ ts TIMESTAMP,
+ currency VARCHAR(3),
+ amount FLOAT,
+ unique(sn)
+ ) as VALUES
+ (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+ (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+ (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+ (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+ (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+ (1, 'TUR', NULL, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# when group by contains primary key expression
+# we can use all the expressions in the table during selection
+# (not just group by expressions + aggregation result)
+query TT
+EXPLAIN SELECT s.sn, s.amount, 2*s.sn
+ FROM sales_global_with_pk AS s
+ GROUP BY sn
+ ORDER BY sn
+----
+logical_plan
+Sort: s.sn ASC NULLS LAST
+--Projection: s.sn, s.amount, Int64(2) * CAST(s.sn AS Int64)
+----Aggregate: groupBy=[[s.sn, s.amount]], aggr=[[]]
+------SubqueryAlias: s
+--------TableScan: sales_global_with_pk projection=[sn, amount]
+physical_plan
+SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
+--SortExec: expr=[sn@0 ASC NULLS LAST]
+----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS Int64) as Int64(2) * s.sn]
+------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8
+------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[]
+--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+
+query IRI
+SELECT s.sn, s.amount, 2*s.sn
+ FROM sales_global_with_pk AS s
+ GROUP BY sn
+ ORDER BY sn
+----
+0 30 0
+1 50 2
+2 75 4
+3 200 6
+4 100 8
+
+# Join should propagate primary key successfully
+query TT
+EXPLAIN SELECT r.sn, SUM(l.amount), r.amount
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r
+ ON l.sn >= r.sn
+ GROUP BY r.sn
+ ORDER BY r.sn
+----
+logical_plan
+Sort: r.sn ASC NULLS LAST
+--Projection: r.sn, SUM(l.amount), r.amount
+----Aggregate: groupBy=[[r.sn, r.amount]], aggr=[[SUM(l.amount)]]
+------Projection: l.amount, r.sn, r.amount
+--------Inner Join: Filter: l.sn >= r.sn
+----------SubqueryAlias: l
+------------TableScan: sales_global_with_pk projection=[sn, amount]
+----------SubqueryAlias: r
+------------TableScan: sales_global_with_pk projection=[sn, amount]
+physical_plan
+SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
+--SortExec: expr=[sn@0 ASC NULLS LAST]
+----ProjectionExec: expr=[sn@0 as sn, SUM(l.amount)@2 as SUM(l.amount), amount@1 as amount]
+------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[SUM(l.amount)]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8
+------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[SUM(l.amount)]
+--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount]
+----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1
+------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+------------------CoalescePartitionsExec
+--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+
+query IRR
+SELECT r.sn, SUM(l.amount), r.amount
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r
+ ON l.sn >= r.sn
+ GROUP BY r.sn
+ ORDER BY r.sn
+----
+0 455 30
+1 425 50
+2 375 75
+3 300 200
+4 100 100
+
+# when primary key consists of composite columns
+# to associate it with other fields, aggregate should contain all the composite columns
+query IRR
+SELECT r.sn, SUM(l.amount), r.amount
+ FROM sales_global_with_composite_pk AS l
+ JOIN sales_global_with_composite_pk AS r
+ ON l.sn >= r.sn
+ GROUP BY r.sn, r.ts
+ ORDER BY r.sn
+----
+0 455 30
+1 425 50
+2 375 75
+3 300 200
+4 100 100
+
+# when primary key consists of composite columns
+# to associate it with other fields, aggregate should contain all the composite columns
+# if any of the composite column is missing, we cannot use associated indices, inside select expression
+# below query should fail
+statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.amount could not be resolved from available columns: r.sn, SUM\(l.amount\)
+SELECT r.sn, SUM(l.amount), r.amount
+ FROM sales_global_with_composite_pk AS l
+ JOIN sales_global_with_composite_pk AS r
+ ON l.sn >= r.sn
+ GROUP BY r.sn
+ ORDER BY r.sn
+
+# left join should propagate right side constraint,
+# if right side is a primary key (unique and doesn't contain null)
+query IRR
+SELECT r.sn, r.amount, SUM(r.amount)
+ FROM (SELECT *
+ FROM sales_global_with_pk as l
+ LEFT JOIN sales_global_with_pk as r
+ ON l.amount >= r.amount + 10)
+ GROUP BY r.sn
+ORDER BY r.sn
+----
+0 30 120
+1 50 150
+2 75 150
+4 100 100
+NULL NULL NULL
+
+# left join shouldn't propagate right side constraint,
+# if right side is a unique key (unique and can contain null)
+# Please note that, above query and this one is same except the constraint in the table.
+statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.amount could not be resolved from available columns: r.sn, SUM\(r.amount\)
+SELECT r.sn, r.amount, SUM(r.amount)
+ FROM (SELECT *
+ FROM sales_global_with_unique as l
+ LEFT JOIN sales_global_with_unique as r
+ ON l.amount >= r.amount + 10)
+ GROUP BY r.sn
+ORDER BY r.sn
+
+# left semi join should propagate constraint of left side as is.
+query IRR
+SELECT l.sn, l.amount, SUM(l.amount)
+ FROM (SELECT *
+ FROM sales_global_with_unique as l
+ LEFT SEMI JOIN sales_global_with_unique as r
+ ON l.amount >= r.amount + 10)
+ GROUP BY l.sn
+ORDER BY l.sn
+----
+1 50 50
+2 75 75
+3 200 200
+4 100 100
+NULL 100 100
+
+# Similarly, left anti join should propagate constraint of left side as is.
+query IRR
+SELECT l.sn, l.amount, SUM(l.amount)
+ FROM (SELECT *
+ FROM sales_global_with_unique as l
+ LEFT ANTI JOIN sales_global_with_unique as r
+ ON l.amount >= r.amount + 10)
+ GROUP BY l.sn
+ORDER BY l.sn
+----
+0 30 30
+
+
+# primary key should be aware from which columns it is associated
+statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\)
+SELECT l.sn, r.sn, SUM(l.amount), r.amount
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r
+ ON l.sn >= r.sn
+ GROUP BY l.sn
+ ORDER BY l.sn
+
+# window should propagate primary key successfully
+query TT
+EXPLAIN SELECT *
+ FROM(SELECT *, SUM(l.amount) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_amount
+ FROM sales_global_with_pk AS l
+ ) as l
+ GROUP BY l.sn
+ ORDER BY l.sn
+----
+logical_plan
+Sort: l.sn ASC NULLS LAST
+--Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, l.sum_amount
+----Aggregate: groupBy=[[l.sn, l.zip_code, l.country, l.ts, l.currency, l.amount, l.sum_amount]], aggr=[[]]
+------SubqueryAlias: l
+--------Projection: l.zip_code, l.country, l.sn, l.ts, l.currency, l.amount, SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum_amount
+----------WindowAggr: windowExpr=[[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------------SubqueryAlias: l
+--------------TableScan: sales_global_with_pk projection=[zip_code, country, sn, ts, currency, amount]
+physical_plan
+SortPreservingMergeExec: [sn@2 ASC NULLS LAST]
+--SortExec: expr=[sn@2 ASC NULLS LAST]
+----ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount]
+------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, zip_code@1 as zip_code, country@2 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([sn@0, zip_code@1, country@2, ts@3, currency@4, amount@5, sum_amount@6], 8), input_partitions=1
+------------AggregateExec: mode=Partial, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as sum_amount], aggr=[]
+--------------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@6 as sum_amount]
+----------------BoundedWindowAggExec: wdw=[SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+------------------CoalescePartitionsExec
+--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+
+query ITIPTRR
+SELECT *
+ FROM(SELECT *, SUM(l.amount) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum_amount
+ FROM sales_global_with_pk AS l
+ ) as l
+ GROUP BY l.sn
+ ORDER BY l.sn
+----
+0 GRC 0 2022-01-01T06:00:00 EUR 30 80
+1 FRA 1 2022-01-01T08:00:00 EUR 50 155
+1 TUR 2 2022-01-01T11:30:00 TRY 75 325
+1 FRA 3 2022-01-02T12:00:00 EUR 200 375
+1 TUR 4 2022-01-03T10:00:00 TRY 100 300
+
+# join should propagate primary key correctly
+query IRP
+SELECT l.sn, SUM(l.amount), l.ts
+FROM
+ (SELECT *
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r ON l.sn >= r.sn)
+GROUP BY l.sn
+ORDER BY l.sn
+----
+0 30 2022-01-01T06:00:00
+1 100 2022-01-01T08:00:00
+2 225 2022-01-01T11:30:00
+3 800 2022-01-02T12:00:00
+4 500 2022-01-03T10:00:00
+
+# Projection propagates primary keys correctly
+# (we can use r.ts at the final projection, because it
+# is associated with primary key r.sn)
+query IRP
+SELECT r.sn, SUM(r.amount), r.ts
+FROM
+ (SELECT r.ts, r.sn, r.amount
+ FROM
+ (SELECT *
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r ON l.sn >= r.sn))
+GROUP BY r.sn
+ORDER BY r.sn
+----
+0 150 2022-01-01T06:00:00
+1 200 2022-01-01T08:00:00
+2 225 2022-01-01T11:30:00
+3 400 2022-01-02T12:00:00
+4 100 2022-01-03T10:00:00
+
+# after join, new window expressions shouldn't be associated with primary keys
+statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression rn1 could not be resolved from available columns: r.sn, SUM\(r.amount\)
+SELECT r.sn, SUM(r.amount), rn1
+FROM
+ (SELECT r.ts, r.sn, r.amount,
+ ROW_NUMBER() OVER() AS rn1
+ FROM
+ (SELECT *
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r ON l.sn >= r.sn))
+GROUP BY r.sn
+
+# aggregate should propagate primary key successfully
+query IPR
+SELECT sn, ts, sum1
+FROM (
+ SELECT ts, sn, SUM(amount) as sum1
+ FROM sales_global_with_pk
+ GROUP BY sn)
+GROUP BY sn
+ORDER BY sn
+----
+0 2022-01-01T06:00:00 30
+1 2022-01-01T08:00:00 50
+2 2022-01-01T11:30:00 75
+3 2022-01-02T12:00:00 200
+4 2022-01-03T10:00:00 100
+
+# aggregate should be able to introduce functional dependence
+# (when group by contains single expression, group by expression
+# becomes determinant, after aggregation; since we are sure that
+# it will consist of unique values.)
+# please note that ts is not primary key, still
+# we can use sum1, after outer aggregation because
+# after inner aggregation, ts becomes determinant
+# of functional dependence.
+query PR
+SELECT ts, sum1
+FROM (
+ SELECT ts, SUM(amount) as sum1
+ FROM sales_global_with_pk
+ GROUP BY ts)
+GROUP BY ts
+ORDER BY ts
+----
+2022-01-01T06:00:00 30
+2022-01-01T08:00:00 50
+2022-01-01T11:30:00 75
+2022-01-02T12:00:00 200
+2022-01-03T10:00:00 100
+
+# aggregate should update its functional dependence
+# mode, if it is guaranteed that, after aggregation
+# group by expressions will be unique.
+query IRI
+SELECT *
+FROM (
+ SELECT *, ROW_NUMBER() OVER(ORDER BY l.sn) AS rn1
+ FROM (
+ SELECT l.sn, SUM(l.amount)
+ FROM (
+ SELECT l.sn, l.amount, SUM(l.amount) as sum1
+ FROM
+ (SELECT *
+ FROM sales_global_with_pk AS l
+ JOIN sales_global_with_pk AS r ON l.sn >= r.sn)
+ GROUP BY l.sn)
+ GROUP BY l.sn)
+ )
+GROUP BY l.sn
+ORDER BY l.sn
+----
+0 30 1
+1 50 2
+2 75 3
+3 200 4
+4 100 5
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index cd8940e134..4d985456f9 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -42,7 +42,8 @@ use crate::{
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::{
display::ToStringifiedPlan, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
- OwnedTableReference, Result, ScalarValue, TableReference, ToDFSchema,
+ FunctionalDependencies, OwnedTableReference, Result, ScalarValue, TableReference,
+ ToDFSchema,
};
use std::any::Any;
use std::cmp::Ordering;
@@ -263,10 +264,16 @@ impl LogicalPlanBuilder {
}
let schema = table_source.schema();
+ let func_dependencies = FunctionalDependencies::new_from_constraints(
+ table_source.constraints(),
+ schema.fields.len(),
+ );
let projected_schema = projection
.as_ref()
.map(|p| {
+ let projected_func_dependencies =
+ func_dependencies.project_functional_dependencies(p, p.len());
DFSchema::new_with_metadata(
p.iter()
.map(|i| {
@@ -278,9 +285,14 @@ impl LogicalPlanBuilder {
.collect(),
schema.metadata().clone(),
)
+ .map(|df_schema| {
+ df_schema.with_functional_dependencies(projected_func_dependencies)
+ })
})
.unwrap_or_else(|| {
- DFSchema::try_from_qualified_schema(table_name.clone(), &schema)
+ DFSchema::try_from_qualified_schema(table_name.clone(), &schema).map(
+ |df_schema| df_schema.with_functional_dependencies(func_dependencies),
+ )
})?;
let table_scan = LogicalPlan::TableScan(TableScan {
@@ -803,11 +815,12 @@ impl LogicalPlanBuilder {
/// Apply a cross join
pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
- let schema = self.plan.schema().join(right.schema())?;
+ let join_schema =
+ build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(self.plan),
right: Arc::new(right),
- schema: DFSchemaRef::new(schema),
+ schema: DFSchemaRef::new(join_schema),
})))
}
@@ -1086,10 +1099,15 @@ pub fn build_join_schema(
right_fields.clone()
}
};
-
+ let func_dependencies = left.functional_dependencies().join(
+ right.functional_dependencies(),
+ join_type,
+ left_fields.len(),
+ );
let mut metadata = left.metadata().clone();
metadata.extend(right.metadata().clone());
- DFSchema::new_with_metadata(fields, metadata)
+ Ok(DFSchema::new_with_metadata(fields, metadata)?
+ .with_functional_dependencies(func_dependencies))
}
/// Errors if one or more expressions have equal names.
@@ -1400,10 +1418,11 @@ pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
})
.collect::<Vec<_>>();
- let schema = Arc::new(DFSchema::new_with_metadata(
- fields,
- input_schema.metadata().clone(),
- )?);
+ let schema = Arc::new(
+ DFSchema::new_with_metadata(fields, input_schema.metadata().clone())?
+ // We can use the existing functional dependencies:
+ .with_functional_dependencies(input_schema.functional_dependencies().clone()),
+ );
Ok(LogicalPlan::Unnest(Unnest {
input: Arc::new(input),
@@ -1414,14 +1433,16 @@ pub fn unnest(input: LogicalPlan, column: Column) -> Result<LogicalPlan> {
#[cfg(test)]
mod tests {
- use crate::{expr, expr_fn::exists};
- use arrow::datatypes::{DataType, Field};
- use datafusion_common::{OwnedTableReference, SchemaError, TableReference};
-
use crate::logical_plan::StringifiedPlan;
+ use crate::{col, in_subquery, lit, scalar_subquery, sum};
+ use crate::{expr, expr_fn::exists};
use super::*;
- use crate::{col, in_subquery, lit, scalar_subquery, sum};
+
+ use arrow::datatypes::{DataType, Field};
+ use datafusion_common::{
+ FunctionalDependence, OwnedTableReference, SchemaError, TableReference,
+ };
#[test]
fn plan_builder_simple() -> Result<()> {
@@ -1922,4 +1943,21 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_get_updated_id_keys() {
+ let fund_dependencies =
+ FunctionalDependencies::new(vec![FunctionalDependence::new(
+ vec![1],
+ vec![0, 1, 2],
+ true,
+ )]);
+ let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
+ let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
+ vec![0],
+ vec![0, 1],
+ true,
+ )]);
+ assert_eq!(res, expected);
+ }
}
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index e005f11471..dc247da364 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -15,10 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_common::{
- parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
-};
-use datafusion_common::{Column, OwnedSchemaReference};
use std::collections::HashMap;
use std::sync::Arc;
use std::{
@@ -28,6 +24,11 @@ use std::{
use crate::{Expr, LogicalPlan};
+use datafusion_common::parsers::CompressionTypeVariant;
+use datafusion_common::{
+ Constraints, DFSchemaRef, OwnedSchemaReference, OwnedTableReference,
+};
+
/// Various types of DDL (CREATE / DROP) catalog manipulation
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum DdlStatement {
@@ -117,16 +118,10 @@ impl DdlStatement {
}
DdlStatement::CreateMemoryTable(CreateMemoryTable {
name,
- primary_key,
+ constraints,
..
}) => {
- let pk: Vec<String> =
- primary_key.iter().map(|c| c.name.to_string()).collect();
- let mut pk = pk.join(", ");
- if !pk.is_empty() {
- pk = format!(" primary_key=[{pk}]");
- }
- write!(f, "CreateMemoryTable: {name:?}{pk}")
+ write!(f, "CreateMemoryTable: {name:?}{constraints}")
}
DdlStatement::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {name:?}")
@@ -222,8 +217,8 @@ impl Hash for CreateExternalTable {
pub struct CreateMemoryTable {
/// The table name
pub name: OwnedTableReference,
- /// The ordered list of columns in the primary key, or an empty vector if none
- pub primary_key: Vec<Column>,
+ /// The list of constraints in the schema, such as primary key, unique, etc.
+ pub constraints: Constraints,
/// The logical plan
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index be8270ddc3..c0af08a366 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -30,24 +30,25 @@ use crate::utils::{
use crate::{
build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource,
};
+
+use super::DdlStatement;
+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeVisitor, VisitRecursion,
};
use datafusion_common::{
- plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
- OwnedTableReference, Result, ScalarValue,
+ aggregate_functional_dependencies, plan_err, Column, DFField, DFSchema, DFSchemaRef,
+ DataFusionError, FunctionalDependencies, OwnedTableReference, Result, ScalarValue,
};
-use std::collections::{HashMap, HashSet};
-use std::fmt::{self, Debug, Display, Formatter};
-use std::hash::{Hash, Hasher};
-use std::sync::Arc;
-
// backwards compatibility
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};
-use super::DdlStatement;
+use std::collections::{HashMap, HashSet};
+use std::fmt::{self, Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
+use std::sync::Arc;
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
@@ -1300,6 +1301,11 @@ impl Projection {
if expr.len() != schema.fields().len() {
return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len())));
}
+ // Update functional dependencies of `input` according to projection
+ // expressions:
+ let id_key_groups = calc_func_dependencies_for_project(&expr, &input)?;
+ let schema = schema.as_ref().clone();
+ let schema = Arc::new(schema.with_functional_dependencies(id_key_groups));
Ok(Self {
expr,
input,
@@ -1343,8 +1349,13 @@ impl SubqueryAlias {
) -> Result<Self> {
let alias = alias.into();
let schema: Schema = plan.schema().as_ref().clone().into();
- let schema =
- DFSchemaRef::new(DFSchema::try_from_qualified_schema(&alias, &schema)?);
+ // Since schema is the same, other than qualifier, we can use existing
+ // functional dependencies:
+ let func_dependencies = plan.schema().functional_dependencies().clone();
+ let schema = DFSchemaRef::new(
+ DFSchema::try_from_qualified_schema(&alias, &schema)?
+ .with_functional_dependencies(func_dependencies),
+ );
Ok(SubqueryAlias {
input: Arc::new(plan),
alias,
@@ -1420,10 +1431,18 @@ impl Window {
.extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?);
let metadata = input.schema().metadata().clone();
+ // Update functional dependencies for window:
+ let mut window_func_dependencies =
+ input.schema().functional_dependencies().clone();
+ window_func_dependencies.extend_target_indices(window_fields.len());
+
Ok(Window {
input,
window_expr,
- schema: Arc::new(DFSchema::new_with_metadata(window_fields, metadata)?),
+ schema: Arc::new(
+ DFSchema::new_with_metadata(window_fields, metadata)?
+ .with_functional_dependencies(window_func_dependencies),
+ ),
})
}
}
@@ -1610,10 +1629,12 @@ impl Aggregate {
let group_expr = enumerate_grouping_sets(group_expr)?;
let grouping_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
let all_expr = grouping_expr.iter().chain(aggr_expr.iter());
+
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, &input)?,
input.schema().metadata().clone(),
)?;
+
Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
}
@@ -1642,6 +1663,13 @@ impl Aggregate {
schema.fields().len()
)));
}
+
+ let aggregate_func_dependencies =
+ calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
+ let new_schema = schema.as_ref().clone();
+ let schema = Arc::new(
+ new_schema.with_functional_dependencies(aggregate_func_dependencies),
+ );
Ok(Self {
input,
group_expr,
@@ -1651,6 +1679,71 @@ impl Aggregate {
}
}
+/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
+fn contains_grouping_set(group_expr: &[Expr]) -> bool {
+ group_expr
+ .iter()
+ .any(|expr| matches!(expr, Expr::GroupingSet(_)))
+}
+
+/// Calculates functional dependencies for aggregate expressions.
+fn calc_func_dependencies_for_aggregate(
+ // Expressions in the GROUP BY clause:
+ group_expr: &[Expr],
+ // Input plan of the aggregate:
+ input: &LogicalPlan,
+ // Aggregate schema
+ aggr_schema: &DFSchema,
+) -> Result<FunctionalDependencies> {
+ // We can do a case analysis on how to propagate functional dependencies based on
+ // whether the GROUP BY in question contains a grouping set expression:
+ // - If so, the functional dependencies will be empty because we cannot guarantee
+ // that GROUP BY expression results will be unique.
+ // - Otherwise, it may be possible to propagate functional dependencies.
+ if !contains_grouping_set(group_expr) {
+ let group_by_expr_names = group_expr
+ .iter()
+ .map(|item| item.display_name())
+ .collect::<Result<Vec<_>>>()?;
+ let aggregate_func_dependencies = aggregate_functional_dependencies(
+ input.schema(),
+ &group_by_expr_names,
+ aggr_schema,
+ );
+ Ok(aggregate_func_dependencies)
+ } else {
+ Ok(FunctionalDependencies::empty())
+ }
+}
+
+/// This function projects functional dependencies of the `input` plan according
+/// to projection expressions `exprs`.
+fn calc_func_dependencies_for_project(
+ exprs: &[Expr],
+ input: &LogicalPlan,
+) -> Result<FunctionalDependencies> {
+ let input_fields = input.schema().fields();
+ // Calculate expression indices (if present) in the input schema.
+ let proj_indices = exprs
+ .iter()
+ .filter_map(|expr| {
+ let expr_name = match expr {
+ Expr::Alias(alias) => {
+ format!("{}", alias.expr)
+ }
+ _ => format!("{}", expr),
+ };
+ input_fields
+ .iter()
+ .position(|item| item.qualified_name() == expr_name)
+ })
+ .collect::<Vec<_>>();
+ Ok(input
+ .schema()
+ .functional_dependencies()
+ .project_functional_dependencies(&proj_indices, exprs.len()))
+}
+
/// Sorts its input according to a list of sort expressions.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Sort {
diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs
index 2f5a8923e8..b83ce77813 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -18,8 +18,10 @@
//! Table source
use crate::{Expr, LogicalPlan};
+
use arrow::datatypes::SchemaRef;
-use datafusion_common::Result;
+use datafusion_common::{Constraints, Result};
+
use std::any::Any;
/// Indicates whether and how a filter expression can be handled by a
@@ -64,6 +66,11 @@ pub trait TableSource: Sync + Send {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
+ /// Get primary key indices, if one exists.
+ fn constraints(&self) -> Option<&Constraints> {
+ None
+ }
+
/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType {
TableType::Base
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 3ddfea5105..efaf291398 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -34,8 +34,8 @@ use datafusion_common::tree_node::{
RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion,
};
use datafusion_common::{
- Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
- TableReference,
+ Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
+ ScalarValue, TableReference,
};
use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions};
use std::cmp::Ordering;
@@ -446,7 +446,9 @@ pub fn expand_qualified_wildcard(
)));
}
let qualified_schema =
- DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
+ DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?
+ // We can use the functional dependencies as is, since it only stores indices:
+ .with_functional_dependencies(schema.functional_dependencies().clone());
let excluded_columns = if let Some(WildcardAdditionalOptions {
opt_exclude,
opt_except,
@@ -921,7 +923,7 @@ pub fn from_plan(
})) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
- primary_key: vec![],
+ constraints: Constraints::empty(),
name: name.clone(),
if_not_exists: *if_not_exists,
or_replace: *or_replace,
@@ -1016,10 +1018,13 @@ pub fn from_plan(
})
.collect::<Vec<_>>();
- let schema = Arc::new(DFSchema::new_with_metadata(
- fields,
- input.schema().metadata().clone(),
- )?);
+ let schema = Arc::new(
+ DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?
+ // We can use the existing functional dependencies as is:
+ .with_functional_dependencies(
+ input.schema().functional_dependencies().clone(),
+ ),
+ );
Ok(LogicalPlan::Unnest(Unnest {
input,
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index f4a5147813..95061e3854 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -262,8 +262,10 @@ fn rewrite_schema(schema: &DFSchema) -> DFSchemaRef {
)
})
.collect::<Vec<DFField>>();
- DFSchemaRef::new(
- DFSchema::new_with_metadata(new_fields, schema.metadata().clone()).unwrap(),
+ Arc::new(
+ DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
+ .unwrap()
+ .with_functional_dependencies(schema.functional_dependencies().clone()),
)
}
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 2306593d42..920b9ea18f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -493,12 +493,12 @@ mod tests {
assert_eq!(
"get table_scan rule\ncaused by\n\
Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \
- original schema: DFSchema { fields: [], metadata: {} }, \
+ original schema: DFSchema { fields: [], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
new schema: DFSchema { fields: [\
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }], \
- metadata: {} }. \
+ metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }. \
This was likely caused by a bug in DataFusion's code \
and we would welcome that you file an bug report in our issue tracker",
err.to_string()
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index ba7e89094b..a9e65b3e7c 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -167,12 +167,13 @@ impl OptimizerRule for SingleDistinctToGroupBy {
Vec::new(),
)?);
+ let outer_fields = outer_group_exprs
+ .iter()
+ .chain(new_aggr_exprs.iter())
+ .map(|expr| expr.to_field(&inner_schema))
+ .collect::<Result<Vec<_>>>()?;
let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata(
- outer_group_exprs
- .iter()
- .chain(new_aggr_exprs.iter())
- .map(|expr| expr.to_field(&inner_schema))
- .collect::<Result<Vec<_>>>()?,
+ outer_fields,
input.schema().metadata().clone(),
)?);
diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs
index 2d7771d8c7..34b24b0594 100644
--- a/datafusion/sql/src/query.rs
+++ b/datafusion/sql/src/query.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{Constraints, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Expr, LogicalPlan, LogicalPlanBuilder,
};
@@ -86,7 +86,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let select_into = select.into.unwrap();
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(select_into.name)?,
- primary_key: Vec::new(),
+ constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 7841858bac..daf79e969e 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -15,12 +15,19 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashSet;
+use std::sync::Arc;
+
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs,
};
-use datafusion_common::{DataFusionError, Result};
+
+use datafusion_common::{
+ get_target_functional_dependencies, DFSchemaRef, DataFusionError, Result,
+};
+use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
};
@@ -32,12 +39,8 @@ use datafusion_expr::utils::{
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
-
-use datafusion_expr::expr::Alias;
use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};
-use std::collections::HashSet;
-use std::sync::Arc;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Generate a logic plan from an SQL select
@@ -431,6 +434,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
group_by_exprs: Vec<Expr>,
aggr_exprs: Vec<Expr>,
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
+ let group_by_exprs =
+ get_updated_group_by_exprs(&group_by_exprs, select_exprs, input.schema())?;
+
// create the aggregate plan
let plan = LogicalPlanBuilder::from(input.clone())
.aggregate(group_by_exprs.clone(), aggr_exprs.clone())?
@@ -555,3 +561,40 @@ fn match_window_definitions(
}
Ok(())
}
+
+/// Update group by exprs, according to functioanl dependencies
+fn get_updated_group_by_exprs(
+ group_by_exprs: &[Expr],
+ select_exprs: &[Expr],
+ schema: &DFSchemaRef,
+) -> Result<Vec<Expr>> {
+ let mut new_group_by_exprs = group_by_exprs.to_vec();
+ let fields = schema.fields();
+ let group_by_expr_names = group_by_exprs
+ .iter()
+ .map(|group_by_expr| group_by_expr.display_name())
+ .collect::<Result<Vec<_>>>()?;
+ // Get targets that can be used in a select, even if they do not occur in aggregation:
+ if let Some(target_indices) =
+ get_target_functional_dependencies(schema, &group_by_expr_names)
+ {
+ // Calculate dependent fields names with determinant GROUP BY expression:
+ let associated_field_names = target_indices
+ .iter()
+ .map(|idx| fields[*idx].qualified_name())
+ .collect::<Vec<_>>();
+ // Expand GROUP BY expressions with select expressions: If a GROUP
+ // BY expression is a determinant key, we can use its dependent
+ // columns in select statements also.
+ for expr in select_exprs {
+ let expr_name = format!("{}", expr);
+ if !new_group_by_exprs.contains(expr)
+ && associated_field_names.contains(&expr_name)
+ {
+ new_group_by_exprs.push(expr.clone());
+ }
+ }
+ }
+
+ Ok(new_group_by_exprs)
+}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index a5ff3633ac..4af32337f7 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -23,12 +23,15 @@ use crate::planner::{
object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
};
use crate::utils::normalize_ident;
+
use arrow_schema::DataType;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
- unqualified_field_not_found, Column, DFField, DFSchema, DFSchemaRef, DataFusionError,
- ExprSchema, OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema,
+ unqualified_field_not_found, Column, Constraints, DFField, DFSchema, DFSchemaRef,
+ DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference,
+ TableReference, ToDFSchema,
};
+use datafusion_expr::expr::Placeholder;
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::logical_plan::DdlStatement;
@@ -45,12 +48,11 @@ use datafusion_expr::{
use sqlparser::ast;
use sqlparser::ast::{
Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, Query, SchemaName,
- SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableConstraint,
- TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
+ SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor,
+ TableWithJoins, TransactionMode, UnaryOperator, Value,
};
-
-use datafusion_expr::expr::Placeholder;
use sqlparser::parser::ParserError::ParserError;
+
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
@@ -132,8 +134,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
..
} if table_properties.is_empty() && with_options.is_empty() => match query {
Some(query) => {
- let primary_key = Self::primary_key_from_constraints(&constraints)?;
-
let plan = self.query_to_plan(*query, planner_context)?;
let input_schema = plan.schema();
@@ -163,10 +163,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};
+ let constraints = Constraints::new_from_table_constraints(
+ &constraints,
+ plan.schema(),
+ )?;
+
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
- primary_key,
+ constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
@@ -175,19 +180,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
None => {
- let primary_key = Self::primary_key_from_constraints(&constraints)?;
-
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
};
let plan = LogicalPlan::EmptyRelation(plan);
-
+ let constraints = Constraints::new_from_table_constraints(
+ &constraints,
+ plan.schema(),
+ )?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(name)?,
- primary_key,
+ constraints,
input: Arc::new(plan),
if_not_exists,
or_replace,
@@ -1160,54 +1166,4 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.get_table_provider(tables_reference)
.is_ok()
}
-
- fn primary_key_from_constraints(
- constraints: &[TableConstraint],
- ) -> Result<Vec<Column>> {
- let pk: Result<Vec<&Vec<Ident>>> = constraints
- .iter()
- .map(|c: &TableConstraint| match c {
- TableConstraint::Unique {
- columns,
- is_primary,
- ..
- } => match is_primary {
- true => Ok(columns),
- false => Err(DataFusionError::Plan(
- "Non-primary unique constraints are not supported".to_string(),
- )),
- },
- TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
- "Foreign key constraints are not currently supported".to_string(),
- )),
- TableConstraint::Check { .. } => Err(DataFusionError::Plan(
- "Check constraints are not currently supported".to_string(),
- )),
- TableConstraint::Index { .. } => Err(DataFusionError::Plan(
- "Indexes are not currently supported".to_string(),
- )),
- TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
- "Indexes are not currently supported".to_string(),
- )),
- })
- .collect();
- let pk = pk?;
- let pk = match pk.as_slice() {
- [] => return Ok(vec![]),
- [pk] => pk,
- _ => {
- return Err(DataFusionError::Plan(
- "Only one primary key is supported!".to_string(),
- ))?
- }
- };
- let primary_key: Vec<Column> = pk
- .iter()
- .map(|c| Column {
- relation: None,
- name: c.value.clone(),
- })
- .collect();
- Ok(primary_key)
- }
}
diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs
index 6b498084a4..88dddc7336 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -232,7 +232,7 @@ fn cast_to_invalid_decimal_type_precision_lt_scale() {
fn plan_create_table_with_pk() {
let sql = "create table person (id int, name string, primary key(id))";
let plan = r#"
-CreateMemoryTable: Bare { table: "person" } primary_key=[id]
+CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]
EmptyRelation
"#
.trim();
@@ -251,10 +251,9 @@ CreateMemoryTable: Bare { table: "person" }
}
#[test]
-#[should_panic(expected = "Non-primary unique constraints are not supported")]
fn plan_create_table_check_constraint() {
let sql = "create table person (id int, name string, unique(id))";
- let plan = "";
+ let plan = "CreateMemoryTable: Bare { table: \"person\" } constraints=[Unique([0])]\n EmptyRelation";
quick_test(sql, plan);
}