You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/04/03 17:49:58 UTC
[arrow-datafusion] branch main updated: Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
This is an automated email from the ASF dual-hosted git repository.
avantgardner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 536f2b8250 Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
536f2b8250 is described below
commit 536f2b8250dcadd083f07541d6f5010414310300
Author: Brent Gardner <bg...@squarelabs.net>
AuthorDate: Mon Apr 3 11:49:53 2023 -0600
Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
---
datafusion/core/src/execution/context.rs | 8 ++
datafusion/expr/src/logical_plan/plan.rs | 14 ++-
datafusion/expr/src/utils.rs | 1 +
datafusion/sql/src/statement.rs | 164 ++++++++++++++++++++-----------
datafusion/sql/tests/integration_test.rs | 30 ++++++
5 files changed, 157 insertions(+), 60 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5dc599c2d5..c9ad030991 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -344,7 +344,15 @@ impl SessionContext {
input,
if_not_exists,
or_replace,
+ primary_key,
}) => {
+ if !primary_key.is_empty() {
+ Err(DataFusionError::Execution(
+ "Primary keys on MemoryTables are not currently supported!"
+ .to_string(),
+ ))?;
+ }
+
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let table = self.table(&name).await;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 75dc3f977d..b4fc9edd6b 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1046,9 +1046,17 @@ impl LogicalPlan {
write!(f, "CreateExternalTable: {name:?}")
}
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name, ..
+ name,
+ primary_key,
+ ..
}) => {
- write!(f, "CreateMemoryTable: {name:?}")
+ let pk: Vec<String> =
+ primary_key.iter().map(|c| c.name.to_string()).collect();
+ let mut pk = pk.join(", ");
+ if !pk.is_empty() {
+ pk = format!(" primary_key=[{pk}]");
+ }
+ write!(f, "CreateMemoryTable: {name:?}{pk}")
}
LogicalPlan::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {name:?}")
@@ -1490,6 +1498,8 @@ pub struct Union {
pub struct CreateMemoryTable {
/// The table name
pub name: OwnedTableReference,
+ /// The ordered list of columns in the primary key, or an empty vector if none
+ pub primary_key: Vec<Column>,
/// The logical plan
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 38ed0d99ac..726365f574 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -843,6 +843,7 @@ pub fn from_plan(
..
}) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
+ primary_key: vec![],
name: name.clone(),
if_not_exists: *if_not_exists,
or_replace: *or_replace,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 695fa13ae3..888aa3e8ec 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -45,8 +45,8 @@ use datafusion_expr::{
use sqlparser::ast;
use sqlparser::ast::{
Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
- SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor,
- TableWithJoins, TransactionMode, UnaryOperator, Value,
+ SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
+ TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
};
use sqlparser::parser::ParserError::ParserError;
@@ -128,69 +128,67 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
or_replace,
..
- } if constraints.is_empty()
- && table_properties.is_empty()
- && with_options.is_empty() =>
- {
- match query {
- Some(query) => {
- let plan = self.query_to_plan(*query, planner_context)?;
- let input_schema = plan.schema();
-
- let plan = if !columns.is_empty() {
- let schema = self.build_schema(columns)?.to_dfschema_ref()?;
- if schema.fields().len() != input_schema.fields().len() {
- return Err(DataFusionError::Plan(format!(
+ } if table_properties.is_empty() && with_options.is_empty() => match query {
+ Some(query) => {
+ let primary_key = Self::primary_key_from_constraints(&constraints)?;
+
+ let plan = self.query_to_plan(*query, planner_context)?;
+ let input_schema = plan.schema();
+
+ let plan = if !columns.is_empty() {
+ let schema = self.build_schema(columns)?.to_dfschema_ref()?;
+ if schema.fields().len() != input_schema.fields().len() {
+ return Err(DataFusionError::Plan(format!(
"Mismatch: {} columns specified, but result has {} columns",
schema.fields().len(),
input_schema.fields().len()
)));
- }
- let input_fields = input_schema.fields();
- let project_exprs = schema
- .fields()
- .iter()
- .zip(input_fields)
- .map(|(field, input_field)| {
- cast(
- col(input_field.name()),
- field.data_type().clone(),
- )
+ }
+ let input_fields = input_schema.fields();
+ let project_exprs = schema
+ .fields()
+ .iter()
+ .zip(input_fields)
+ .map(|(field, input_field)| {
+ cast(col(input_field.name()), field.data_type().clone())
.alias(field.name())
- })
- .collect::<Vec<_>>();
- LogicalPlanBuilder::from(plan.clone())
- .project(project_exprs)?
- .build()?
- } else {
- plan
- };
-
- Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name: self.object_name_to_table_reference(name)?,
- input: Arc::new(plan),
- if_not_exists,
- or_replace,
- }))
- }
+ })
+ .collect::<Vec<_>>();
+ LogicalPlanBuilder::from(plan.clone())
+ .project(project_exprs)?
+ .build()?
+ } else {
+ plan
+ };
+
+ Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name: self.object_name_to_table_reference(name)?,
+ primary_key,
+ input: Arc::new(plan),
+ if_not_exists,
+ or_replace,
+ }))
+ }
- None => {
- let schema = self.build_schema(columns)?.to_dfschema_ref()?;
- let plan = EmptyRelation {
- produce_one_row: false,
- schema,
- };
- let plan = LogicalPlan::EmptyRelation(plan);
-
- Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name: self.object_name_to_table_reference(name)?,
- input: Arc::new(plan),
- if_not_exists,
- or_replace,
- }))
- }
+ None => {
+ let primary_key = Self::primary_key_from_constraints(&constraints)?;
+
+ let schema = self.build_schema(columns)?.to_dfschema_ref()?;
+ let plan = EmptyRelation {
+ produce_one_row: false,
+ schema,
+ };
+ let plan = LogicalPlan::EmptyRelation(plan);
+
+ Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name: self.object_name_to_table_reference(name)?,
+ primary_key,
+ input: Arc::new(plan),
+ if_not_exists,
+ or_replace,
+ }))
}
- }
+ },
Statement::CreateView {
or_replace,
@@ -1076,4 +1074,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.get_table_provider(tables_reference)
.is_ok()
}
+
+ fn primary_key_from_constraints(
+ constraints: &[TableConstraint],
+ ) -> Result<Vec<Column>> {
+ let pk: Result<Vec<&Vec<Ident>>> = constraints
+ .iter()
+ .map(|c: &TableConstraint| match c {
+ TableConstraint::Unique {
+ columns,
+ is_primary,
+ ..
+ } => match is_primary {
+ true => Ok(columns),
+ false => Err(DataFusionError::Plan(
+ "Non-primary unique constraints are not supported".to_string(),
+ )),
+ },
+ TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
+ "Foreign key constraints are not currently supported".to_string(),
+ )),
+ TableConstraint::Check { .. } => Err(DataFusionError::Plan(
+ "Check constraints are not currently supported".to_string(),
+ )),
+ TableConstraint::Index { .. } => Err(DataFusionError::Plan(
+ "Indexes are not currently supported".to_string(),
+ )),
+ TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
+ "Indexes are not currently supported".to_string(),
+ )),
+ })
+ .collect();
+ let pk = pk?;
+ let pk = match pk.as_slice() {
+ [] => return Ok(vec![]),
+ [pk] => pk,
+ _ => {
+ return Err(DataFusionError::Plan(
+ "Only one primary key is supported!".to_string(),
+ ))?
+ }
+ };
+ let primary_key: Vec<Column> = pk
+ .iter()
+ .map(|c| Column {
+ relation: None,
+ name: c.value.clone(),
+ })
+ .collect();
+ Ok(primary_key)
+ }
}
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index 85ef4c3c51..bbfb56a2b3 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -199,6 +199,36 @@ fn cast_to_invalid_decimal_type() {
}
}
+#[test]
+fn plan_create_table_with_pk() {
+ let sql = "create table person (id int, name string, primary key(id))";
+ let plan = r#"
+CreateMemoryTable: Bare { table: "person" } primary_key=[id]
+ EmptyRelation
+ "#
+ .trim();
+ quick_test(sql, plan);
+}
+
+#[test]
+fn plan_create_table_no_pk() {
+ let sql = "create table person (id int, name string)";
+ let plan = r#"
+CreateMemoryTable: Bare { table: "person" }
+ EmptyRelation
+ "#
+ .trim();
+ quick_test(sql, plan);
+}
+
+#[test]
+#[should_panic(expected = "Non-primary unique constraints are not supported")]
+fn plan_create_table_check_constraint() {
+ let sql = "create table person (id int, name string, unique(id))";
+ let plan = "";
+ quick_test(sql, plan);
+}
+
#[test]
fn plan_start_transaction() {
let sql = "start transaction";