You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/07/26 18:32:46 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #7040: Initial support for functional dependencies handling primary key and unique constraints

alamb commented on code in PR #7040:
URL: https://github.com/apache/arrow-datafusion/pull/7040#discussion_r1275325562


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2847,3 +2847,390 @@ SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
 FRA [200.0, 50.0] 50 50
 GRC [80.0, 30.0] 30 30
 TUR [100.0, 75.0] 75 75
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global_with_pk (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT,
+          primary key(sn)
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# create a table for testing, where primary key is composite
+statement ok
+CREATE TABLE sales_global_with_composite_pk (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT,
+          primary key(sn, ts)
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# create a table for testing, where sn is unique key
+statement ok
+CREATE TABLE sales_global_with_unique (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT,
+          unique(sn)
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+          (1, 'TUR', NULL, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0)
+
+# when group by contains primary key expression
+# we can use all the expressions in the table during selection
+# (not just group by expressions + aggregation result)
+query TT
+EXPLAIN SELECT s.sn, s.amount, 2*s.sn
+  FROM sales_global_with_pk AS s
+  GROUP BY sn
+  ORDER BY sn
+----
+logical_plan
+Sort: s.sn ASC NULLS LAST
+--Projection: s.sn, s.amount, Int64(2) * CAST(s.sn AS Int64)
+----Aggregate: groupBy=[[s.sn, s.amount]], aggr=[[]]
+------SubqueryAlias: s
+--------TableScan: sales_global_with_pk projection=[sn, amount]
+physical_plan
+SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
+--SortExec: expr=[sn@0 ASC NULLS LAST]
+----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS Int64) as Int64(2) * s.sn]
+------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[]

Review Comment:
   If the plan is grouping on a unique column I think the `AggregateExec` could be avoided entirely because each group will have exactly one non null row
   
   Maybe that would be good optimization to add in a follow on PR
   
   



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -222,8 +223,8 @@ impl Hash for CreateExternalTable {
 pub struct CreateMemoryTable {
     /// The table name
     pub name: OwnedTableReference,
-    /// The ordered list of columns in the primary key, or an empty vector if none
-    pub primary_key: Vec<Column>,
+    /// The list of constraints in the schema, such as primary key, unique, etc.
+    pub constraints: Vec<Constraint>,

Review Comment:
   👍 



##########
datafusion/sql/src/select.rs:
##########
@@ -431,6 +432,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         group_by_exprs: Vec<Expr>,
         aggr_exprs: Vec<Expr>,
     ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
+        let schema = input.schema();

Review Comment:
   Could you explain why the dependency logic needs to be in the SQL planner? It seems more general than just SQL (it should apply to dataframes as well)
   
   Also, this function calls `LogicalPlanBuilder::aggregate` which then eventally calls `Aggregate::try_new_with_schema` which also re-calculates functional dependency information
   
   Maybe this logic could be put in `Aggregate::try_new_with_schema` if it is not redundant



##########
datafusion/common/src/dfschema.rs:
##########
@@ -20,26 +20,315 @@
 
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
+use std::fmt::{Display, Formatter};
 use std::hash::Hash;
 use std::sync::Arc;
 
 use crate::error::{unqualified_field_not_found, DataFusionError, Result, SchemaError};
-use crate::{field_not_found, Column, OwnedTableReference, TableReference};
+use crate::{field_not_found, Column, JoinType, OwnedTableReference, TableReference};
 
 use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
 
 /// A reference-counted reference to a `DFSchema`.
 pub type DFSchemaRef = Arc<DFSchema>;
 
+/// This object defines a constraint on a table.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum Constraint {
+    /// Columns with the given indices form a composite primary key (they are
+    /// jointly unique and not nullable):
+    PrimaryKey(Vec<usize>),
+    /// Columns with the given indices form a composite unique key:
+    Unique(Vec<usize>),
+}
+

Review Comment:
   It might make the API cleaner to have a `Contraints` that is similar to `FunctionalDependencies`, so the API can be in terms of `Option<&Constraints>` rather than `Option<[&Constraint]>` 
   
   
   ```rust
   /// This object encapsulates a list of functional constraints:
   #[derive(Debug, Clone, PartialEq, Eq)]
   pub struct Constraints {
       inner: Vec<Constraint>,
   }
   



##########
datafusion/sql/tests/sql_integration.rs:
##########
@@ -232,7 +232,7 @@ fn cast_to_invalid_decimal_type_precision_lt_scale() {
 fn plan_create_table_with_pk() {
     let sql = "create table person (id int, name string, primary key(id))";
     let plan = r#"
-CreateMemoryTable: Bare { table: "person" } primary_key=[id]
+CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]

Review Comment:
   👍 



##########
datafusion/common/src/dfschema.rs:
##########
@@ -20,26 +20,315 @@
 
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
+use std::fmt::{Display, Formatter};
 use std::hash::Hash;
 use std::sync::Arc;
 
 use crate::error::{unqualified_field_not_found, DataFusionError, Result, SchemaError};
-use crate::{field_not_found, Column, OwnedTableReference, TableReference};
+use crate::{field_not_found, Column, JoinType, OwnedTableReference, TableReference};
 
 use arrow::compute::can_cast_types;
 use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
-use std::fmt::{Display, Formatter};
 
 /// A reference-counted reference to a `DFSchema`.
 pub type DFSchemaRef = Arc<DFSchema>;
 
+/// This object defines a constraint on a table.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum Constraint {
+    /// Columns with the given indices form a composite primary key (they are
+    /// jointly unique and not nullable):
+    PrimaryKey(Vec<usize>),
+    /// Columns with the given indices form a composite unique key:
+    Unique(Vec<usize>),
+}
+
+/// This object defines a functional dependence in the schema. A functional

Review Comment:
   Both `Constraint` and `FunctionalDependencies` might be easier to find if they were in their own modules / .rs files



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -222,8 +223,8 @@ impl Hash for CreateExternalTable {
 pub struct CreateMemoryTable {
     /// The table name
     pub name: OwnedTableReference,
-    /// The ordered list of columns in the primary key, or an empty vector if none
-    pub primary_key: Vec<Column>,
+    /// The list of constraints in the schema, such as primary key, unique, etc.
+    pub constraints: Vec<Constraint>,

Review Comment:
   👍 



##########
datafusion/sql/src/statement.rs:
##########
@@ -1163,20 +1165,40 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn primary_key_from_constraints(

Review Comment:
   🤔  maybe this function would be more clearly if it were renamed to reflect its expanded role for Constrants not just primary keys
   
   Like
   
   ```suggestion
       fn constrants_from_constraints(
   ```
   
   ?



##########
datafusion/sql/tests/sql_integration.rs:
##########
@@ -232,7 +232,7 @@ fn cast_to_invalid_decimal_type_precision_lt_scale() {
 fn plan_create_table_with_pk() {
     let sql = "create table person (id int, name string, primary key(id))";
     let plan = r#"
-CreateMemoryTable: Bare { table: "person" } primary_key=[id]
+CreateMemoryTable: Bare { table: "person" } constraints=[PrimaryKey([0])]

Review Comment:
   👍 



##########
datafusion/sql/src/statement.rs:
##########
@@ -1163,20 +1165,40 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn primary_key_from_constraints(

Review Comment:
   🤔  maybe this function would be more clearly if it were renamed to reflect its expanded role for Constrants not just primary keys
   
   Like
   
   ```suggestion
       fn constrants_from_constraints(
   ```
   
   ?



##########
datafusion/optimizer/src/analyzer/count_wildcard_rule.rs:
##########
@@ -263,7 +263,9 @@ fn rewrite_schema(schema: &DFSchema) -> DFSchemaRef {
         })
         .collect::<Vec<DFField>>();
     DFSchemaRef::new(
-        DFSchema::new_with_metadata(new_fields, schema.metadata().clone()).unwrap(),
+        DFSchema::new_with_metadata(new_fields, schema.metadata().clone())

Review Comment:
   Constructing DFSchema's is unfortunately awkward but I don't think that is the fault of this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org