You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/04/03 17:49:58 UTC

[arrow-datafusion] branch main updated: Add primary key information to CreateMemoryTable LogicalPlan node (#5835)

This is an automated email from the ASF dual-hosted git repository.

avantgardner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 536f2b8250 Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
536f2b8250 is described below

commit 536f2b8250dcadd083f07541d6f5010414310300
Author: Brent Gardner <bg...@squarelabs.net>
AuthorDate: Mon Apr 3 11:49:53 2023 -0600

    Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
    
    Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
---
 datafusion/core/src/execution/context.rs |   8 ++
 datafusion/expr/src/logical_plan/plan.rs |  14 ++-
 datafusion/expr/src/utils.rs             |   1 +
 datafusion/sql/src/statement.rs          | 164 ++++++++++++++++++++-----------
 datafusion/sql/tests/integration_test.rs |  30 ++++++
 5 files changed, 157 insertions(+), 60 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5dc599c2d5..c9ad030991 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -344,7 +344,15 @@ impl SessionContext {
                 input,
                 if_not_exists,
                 or_replace,
+                primary_key,
             }) => {
+                if !primary_key.is_empty() {
+                    Err(DataFusionError::Execution(
+                        "Primary keys on MemoryTables are not currently supported!"
+                            .to_string(),
+                    ))?;
+                }
+
                 let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
                 let table = self.table(&name).await;
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 75dc3f977d..b4fc9edd6b 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1046,9 +1046,17 @@ impl LogicalPlan {
                         write!(f, "CreateExternalTable: {name:?}")
                     }
                     LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-                        name, ..
+                        name,
+                        primary_key,
+                        ..
                     }) => {
-                        write!(f, "CreateMemoryTable: {name:?}")
+                        let pk: Vec<String> =
+                            primary_key.iter().map(|c| c.name.to_string()).collect();
+                        let mut pk = pk.join(", ");
+                        if !pk.is_empty() {
+                            pk = format!(" primary_key=[{pk}]");
+                        }
+                        write!(f, "CreateMemoryTable: {name:?}{pk}")
                     }
                     LogicalPlan::CreateView(CreateView { name, .. }) => {
                         write!(f, "CreateView: {name:?}")
@@ -1490,6 +1498,8 @@ pub struct Union {
 pub struct CreateMemoryTable {
     /// The table name
     pub name: OwnedTableReference,
+    /// The ordered list of columns in the primary key, or an empty vector if none
+    pub primary_key: Vec<Column>,
     /// The logical plan
     pub input: Arc<LogicalPlan>,
     /// Option to not error if table already exists
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 38ed0d99ac..726365f574 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -843,6 +843,7 @@ pub fn from_plan(
             ..
         }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
             input: Arc::new(inputs[0].clone()),
+            primary_key: vec![],
             name: name.clone(),
             if_not_exists: *if_not_exists,
             or_replace: *or_replace,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 695fa13ae3..888aa3e8ec 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -45,8 +45,8 @@ use datafusion_expr::{
 use sqlparser::ast;
 use sqlparser::ast::{
     Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
-    SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor,
-    TableWithJoins, TransactionMode, UnaryOperator, Value,
+    SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
+    TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
 };
 
 use sqlparser::parser::ParserError::ParserError;
@@ -128,69 +128,67 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 if_not_exists,
                 or_replace,
                 ..
-            } if constraints.is_empty()
-                && table_properties.is_empty()
-                && with_options.is_empty() =>
-            {
-                match query {
-                    Some(query) => {
-                        let plan = self.query_to_plan(*query, planner_context)?;
-                        let input_schema = plan.schema();
-
-                        let plan = if !columns.is_empty() {
-                            let schema = self.build_schema(columns)?.to_dfschema_ref()?;
-                            if schema.fields().len() != input_schema.fields().len() {
-                                return Err(DataFusionError::Plan(format!(
+            } if table_properties.is_empty() && with_options.is_empty() => match query {
+                Some(query) => {
+                    let primary_key = Self::primary_key_from_constraints(&constraints)?;
+
+                    let plan = self.query_to_plan(*query, planner_context)?;
+                    let input_schema = plan.schema();
+
+                    let plan = if !columns.is_empty() {
+                        let schema = self.build_schema(columns)?.to_dfschema_ref()?;
+                        if schema.fields().len() != input_schema.fields().len() {
+                            return Err(DataFusionError::Plan(format!(
                             "Mismatch: {} columns specified, but result has {} columns",
                             schema.fields().len(),
                             input_schema.fields().len()
                         )));
-                            }
-                            let input_fields = input_schema.fields();
-                            let project_exprs = schema
-                                .fields()
-                                .iter()
-                                .zip(input_fields)
-                                .map(|(field, input_field)| {
-                                    cast(
-                                        col(input_field.name()),
-                                        field.data_type().clone(),
-                                    )
+                        }
+                        let input_fields = input_schema.fields();
+                        let project_exprs = schema
+                            .fields()
+                            .iter()
+                            .zip(input_fields)
+                            .map(|(field, input_field)| {
+                                cast(col(input_field.name()), field.data_type().clone())
                                     .alias(field.name())
-                                })
-                                .collect::<Vec<_>>();
-                            LogicalPlanBuilder::from(plan.clone())
-                                .project(project_exprs)?
-                                .build()?
-                        } else {
-                            plan
-                        };
-
-                        Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-                            name: self.object_name_to_table_reference(name)?,
-                            input: Arc::new(plan),
-                            if_not_exists,
-                            or_replace,
-                        }))
-                    }
+                            })
+                            .collect::<Vec<_>>();
+                        LogicalPlanBuilder::from(plan.clone())
+                            .project(project_exprs)?
+                            .build()?
+                    } else {
+                        plan
+                    };
+
+                    Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+                        name: self.object_name_to_table_reference(name)?,
+                        primary_key,
+                        input: Arc::new(plan),
+                        if_not_exists,
+                        or_replace,
+                    }))
+                }
 
-                    None => {
-                        let schema = self.build_schema(columns)?.to_dfschema_ref()?;
-                        let plan = EmptyRelation {
-                            produce_one_row: false,
-                            schema,
-                        };
-                        let plan = LogicalPlan::EmptyRelation(plan);
-
-                        Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-                            name: self.object_name_to_table_reference(name)?,
-                            input: Arc::new(plan),
-                            if_not_exists,
-                            or_replace,
-                        }))
-                    }
+                None => {
+                    let primary_key = Self::primary_key_from_constraints(&constraints)?;
+
+                    let schema = self.build_schema(columns)?.to_dfschema_ref()?;
+                    let plan = EmptyRelation {
+                        produce_one_row: false,
+                        schema,
+                    };
+                    let plan = LogicalPlan::EmptyRelation(plan);
+
+                    Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+                        name: self.object_name_to_table_reference(name)?,
+                        primary_key,
+                        input: Arc::new(plan),
+                        if_not_exists,
+                        or_replace,
+                    }))
                 }
-            }
+            },
 
             Statement::CreateView {
                 or_replace,
@@ -1076,4 +1074,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .get_table_provider(tables_reference)
             .is_ok()
     }
+
+    fn primary_key_from_constraints(
+        constraints: &[TableConstraint],
+    ) -> Result<Vec<Column>> {
+        let pk: Result<Vec<&Vec<Ident>>> = constraints
+            .iter()
+            .map(|c: &TableConstraint| match c {
+                TableConstraint::Unique {
+                    columns,
+                    is_primary,
+                    ..
+                } => match is_primary {
+                    true => Ok(columns),
+                    false => Err(DataFusionError::Plan(
+                        "Non-primary unique constraints are not supported".to_string(),
+                    )),
+                },
+                TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
+                    "Foreign key constraints are not currently supported".to_string(),
+                )),
+                TableConstraint::Check { .. } => Err(DataFusionError::Plan(
+                    "Check constraints are not currently supported".to_string(),
+                )),
+                TableConstraint::Index { .. } => Err(DataFusionError::Plan(
+                    "Indexes are not currently supported".to_string(),
+                )),
+                TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
+                    "Indexes are not currently supported".to_string(),
+                )),
+            })
+            .collect();
+        let pk = pk?;
+        let pk = match pk.as_slice() {
+            [] => return Ok(vec![]),
+            [pk] => pk,
+            _ => {
+                return Err(DataFusionError::Plan(
+                    "Only one primary key is supported!".to_string(),
+                ))?
+            }
+        };
+        let primary_key: Vec<Column> = pk
+            .iter()
+            .map(|c| Column {
+                relation: None,
+                name: c.value.clone(),
+            })
+            .collect();
+        Ok(primary_key)
+    }
 }
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index 85ef4c3c51..bbfb56a2b3 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -199,6 +199,36 @@ fn cast_to_invalid_decimal_type() {
     }
 }
 
+#[test]
+fn plan_create_table_with_pk() {
+    let sql = "create table person (id int, name string, primary key(id))";
+    let plan = r#"
+CreateMemoryTable: Bare { table: "person" } primary_key=[id]
+  EmptyRelation
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
+#[test]
+fn plan_create_table_no_pk() {
+    let sql = "create table person (id int, name string)";
+    let plan = r#"
+CreateMemoryTable: Bare { table: "person" }
+  EmptyRelation
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
+#[test]
+#[should_panic(expected = "Non-primary unique constraints are not supported")]
+fn plan_create_table_check_constraint() {
+    let sql = "create table person (id int, name string, unique(id))";
+    let plan = "";
+    quick_test(sql, plan);
+}
+
 #[test]
 fn plan_start_transaction() {
     let sql = "start transaction";