You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/05 18:01:41 UTC
[arrow-datafusion] branch master updated: Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 823011590 Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)
823011590 is described below
commit 823011590f4509179806a8f8b02aef5bdcbfaec8
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Tue Apr 5 14:01:36 2022 -0400
Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)
* Add IF NOT EXISTS to CREATE EXTERNAL TABLE
* Add test
* ine for create table
* Add sql tests
* Clippy
---
ballista/rust/client/src/context.rs | 88 ++++++++++++---------
ballista/rust/core/proto/ballista.proto | 1 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 4 +
datafusion/core/src/execution/context.rs | 97 ++++++++++++++++--------
datafusion/core/src/logical_plan/plan.rs | 4 +
datafusion/core/src/optimizer/utils.rs | 15 ++--
datafusion/core/src/sql/parser.rs | 26 +++++++
datafusion/core/src/sql/planner.rs | 4 +
datafusion/core/tests/sql/create_drop.rs | 65 ++++++++++++++++
9 files changed, 230 insertions(+), 74 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 0a002e888..451eac25a 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -376,44 +376,58 @@ impl BallistaContext {
ref file_type,
ref has_header,
ref table_partition_cols,
- }) => match file_type {
- FileType::CSV => {
- self.register_csv(
- name,
- location,
- CsvReadOptions::new()
- .schema(&schema.as_ref().to_owned().into())
- .has_header(*has_header)
- .table_partition_cols(table_partition_cols.to_vec()),
- )
- .await?;
- Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
- }
- FileType::Parquet => {
- self.register_parquet(
- name,
- location,
- ParquetReadOptions::default()
- .table_partition_cols(table_partition_cols.to_vec()),
- )
- .await?;
- Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+ ref if_not_exists,
+ }) => {
+ let table_exists = ctx.table_exist(name.as_str())?;
+
+ match (if_not_exists, table_exists) {
+ (_, false) => match file_type {
+ FileType::CSV => {
+ self.register_csv(
+ name,
+ location,
+ CsvReadOptions::new()
+ .schema(&schema.as_ref().to_owned().into())
+ .has_header(*has_header)
+ .table_partition_cols(table_partition_cols.to_vec()),
+ )
+ .await?;
+ Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+ }
+ FileType::Parquet => {
+ self.register_parquet(
+ name,
+ location,
+ ParquetReadOptions::default()
+ .table_partition_cols(table_partition_cols.to_vec()),
+ )
+ .await?;
+ Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+ }
+ FileType::Avro => {
+ self.register_avro(
+ name,
+ location,
+ AvroReadOptions::default()
+ .table_partition_cols(table_partition_cols.to_vec()),
+ )
+ .await?;
+ Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+ }
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported file type {:?}.",
+ file_type
+ ))),
+ },
+ (true, true) => {
+ Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+ }
+ (false, true) => Err(DataFusionError::Execution(format!(
+ "Table '{:?}' already exists",
+ name
+ ))),
}
- FileType::Avro => {
- self.register_avro(
- name,
- location,
- AvroReadOptions::default()
- .table_partition_cols(table_partition_cols.to_vec()),
- )
- .await?;
- Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
- }
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported file type {:?}.",
- file_type
- ))),
- },
+ }
_ => ctx.sql(sql).await,
}
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 7f96a00ea..228c8bee2 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -147,6 +147,7 @@ message CreateExternalTableNode {
bool has_header = 4;
datafusion.DfSchema schema = 5;
repeated string table_partition_cols = 6;
+ bool if_not_exists = 7;
}
message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 3aed2db00..490b2ebec 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -328,6 +328,7 @@ impl AsLogicalPlan for LogicalPlanNode {
table_partition_cols: create_extern_table
.table_partition_cols
.clone(),
+ if_not_exists: create_extern_table.if_not_exists,
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
@@ -775,6 +776,7 @@ impl AsLogicalPlan for LogicalPlanNode {
has_header,
schema: df_schema,
table_partition_cols,
+ if_not_exists,
}) => {
use datafusion::sql::parser::FileType;
@@ -794,6 +796,7 @@ impl AsLogicalPlan for LogicalPlanNode {
has_header: *has_header,
schema: Some(df_schema.into()),
table_partition_cols: table_partition_cols.clone(),
+ if_not_exists: *if_not_exists,
},
)),
})
@@ -1129,6 +1132,7 @@ mod roundtrip_tests {
file_type: *file,
has_header: true,
table_partition_cols: vec![],
+ if_not_exists: false,
});
roundtrip_test!(create_table_node);
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ab65d28f7..04129ccea 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -218,6 +218,7 @@ impl SessionContext {
ref file_type,
ref has_header,
ref table_partition_cols,
+ ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
@@ -238,40 +239,74 @@ impl SessionContext {
DEFAULT_JSON_EXTENSION,
),
};
-
- let options = ListingOptions {
- format: file_format,
- collect_stat: false,
- file_extension: file_extension.to_owned(),
- target_partitions: self.copied_config().target_partitions,
- table_partition_cols: table_partition_cols.clone(),
- };
-
- // TODO make schema in CreateExternalTable optional instead of empty
- let provided_schema = if schema.fields().is_empty() {
- None
- } else {
- Some(Arc::new(schema.as_ref().to_owned().into()))
- };
- self.register_listing_table(name, location, options, provided_schema)
- .await?;
- let plan = LogicalPlanBuilder::empty(false).build()?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ let table = self.table(name.as_str());
+ match (if_not_exists, table) {
+ (true, Ok(_)) => {
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (_, Err(_)) => {
+ // TODO make schema in CreateExternalTable optional instead of empty
+ let provided_schema = if schema.fields().is_empty() {
+ None
+ } else {
+ Some(Arc::new(schema.as_ref().to_owned().into()))
+ };
+ let options = ListingOptions {
+ format: file_format,
+ collect_stat: false,
+ file_extension: file_extension.to_owned(),
+ target_partitions: self.copied_config().target_partitions,
+ table_partition_cols: table_partition_cols.clone(),
+ };
+ self.register_listing_table(
+ name,
+ location,
+ options,
+ provided_schema,
+ )
+ .await?;
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+ "Table '{:?}' already exists",
+ name
+ ))),
+ }
}
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, input }) => {
- let plan = self.optimize(&input)?;
- let physical = Arc::new(DataFrame::new(self.state.clone(), &plan));
-
- let batches: Vec<_> = physical.collect_partitioned().await?;
- let table = Arc::new(MemTable::try_new(
- Arc::new(plan.schema().as_ref().into()),
- batches,
- )?);
- self.register_table(name.as_str(), table)?;
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name,
+ input,
+ if_not_exists,
+ }) => {
+ let table = self.table(name.as_str());
- let plan = LogicalPlanBuilder::empty(false).build()?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ match (if_not_exists, table) {
+ (true, Ok(_)) => {
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (_, Err(_)) => {
+ let plan = self.optimize(&input)?;
+ let physical =
+ Arc::new(DataFrame::new(self.state.clone(), &plan));
+
+ let batches: Vec<_> = physical.collect_partitioned().await?;
+ let table = Arc::new(MemTable::try_new(
+ Arc::new(plan.schema().as_ref().into()),
+ batches,
+ )?);
+
+ self.register_table(name.as_str(), table)?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+ "Table '{:?}' already exists",
+ name
+ ))),
+ }
}
LogicalPlan::DropTable(DropTable {
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index eeb06d559..327f4d764 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -169,6 +169,8 @@ pub struct CreateMemoryTable {
pub name: String,
/// The logical plan
pub input: Arc<LogicalPlan>,
+ /// Option to not error if table already exists
+ pub if_not_exists: bool,
}
/// Creates an external table.
@@ -186,6 +188,8 @@ pub struct CreateExternalTable {
pub has_header: bool,
/// Partition Columns
pub table_partition_cols: Vec<String>,
+ /// Option to not error if table already exists
+ pub if_not_exists: bool,
}
/// Creates a schema.
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 14e5ffa8c..f096cfb5a 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -226,12 +226,15 @@ pub fn from_plan(
n: *n,
input: Arc::new(inputs[0].clone()),
})),
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, .. }) => {
- Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- input: Arc::new(inputs[0].clone()),
- name: name.clone(),
- }))
- }
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name,
+ if_not_exists,
+ ..
+ }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ if_not_exists: *if_not_exists,
+ })),
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(expr, inputs),
})),
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 566e91c92..29ad9f8cd 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -80,6 +80,8 @@ pub struct CreateExternalTable {
pub location: String,
/// Partition Columns
pub table_partition_cols: Vec<String>,
+ /// Option to not error if table already exists
+ pub if_not_exists: bool,
}
/// DataFusion Statement representations.
@@ -298,6 +300,9 @@ impl<'a> DFParser<'a> {
fn parse_create_external_table(&mut self) -> Result<Statement, ParserError> {
self.parser.expect_keyword(Keyword::TABLE)?;
+ let if_not_exists =
+ self.parser
+ .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self.parser.parse_object_name()?;
let (columns, _) = self.parse_columns()?;
self.parser
@@ -324,6 +329,7 @@ impl<'a> DFParser<'a> {
has_header,
location,
table_partition_cols,
+ if_not_exists,
};
Ok(Statement::CreateExternalTable(create))
}
@@ -420,6 +426,7 @@ mod tests {
has_header: false,
location: "foo.csv".into(),
table_partition_cols: vec![],
+ if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
@@ -433,6 +440,7 @@ mod tests {
has_header: false,
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
+ if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
@@ -449,6 +457,7 @@ mod tests {
has_header: true,
location: "foo.csv".into(),
table_partition_cols: vec![],
+ if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
}
@@ -462,6 +471,7 @@ mod tests {
has_header: false,
location: "foo.parquet".into(),
table_partition_cols: vec![],
+ if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
@@ -474,6 +484,7 @@ mod tests {
has_header: false,
location: "foo.parquet".into(),
table_partition_cols: vec![],
+ if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
@@ -486,6 +497,21 @@ mod tests {
has_header: false,
location: "foo.avro".into(),
table_partition_cols: vec![],
+ if_not_exists: false,
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case: it is ok for avro files not to have columns specified
+ let sql =
+ "CREATE EXTERNAL TABLE IF NOT EXISTS t STORED AS PARQUET LOCATION 'foo.parquet'";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![],
+ file_type: FileType::Parquet,
+ has_header: false,
+ location: "foo.parquet".into(),
+ table_partition_cols: vec![],
+ if_not_exists: true,
});
expect_parse_ok(sql, expected)?;
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index b59444ed2..770d16872 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -156,6 +156,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
constraints,
table_properties,
with_options,
+ if_not_exists,
..
} if columns.is_empty()
&& constraints.is_empty()
@@ -167,6 +168,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name: name.to_string(),
input: Arc::new(plan),
+ if_not_exists,
}))
}
Statement::CreateTable { .. } => Err(DataFusionError::NotImplemented(
@@ -309,6 +311,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
has_header,
location,
table_partition_cols,
+ if_not_exists,
} = statement;
// semantic checks
@@ -335,6 +338,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
file_type,
has_header,
table_partition_cols,
+ if_not_exists,
}))
}
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 748bd7ae9..6b1769751 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -126,3 +126,68 @@ async fn create_external_table_with_timestamps() {
];
assert_batches_sorted_eq!(expected, &result);
}
+
+#[tokio::test]
+#[should_panic(expected = "already exists")]
+async fn sql_create_duplicate_table() {
+ // the information schema used to introduce cyclic Arcs
+ let ctx =
+ SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+ // Create table
+ ctx.sql("CREATE TABLE y AS VALUES (1,2,3)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // Create table again
+ let result = ctx
+ .sql("CREATE TABLE y AS VALUES (1,2,3)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ assert_eq!(result, Vec::new());
+}
+
+#[tokio::test]
+async fn sql_create_table_if_not_exists() -> Result<()> {
+ // the information schema used to introduce cyclic Arcs
+ let ctx =
+ SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+ // Create table
+ ctx.sql("CREATE TABLE y AS VALUES (1,2,3)")
+ .await?
+ .collect()
+ .await?;
+
+ // Create table again
+ let result = ctx
+ .sql("CREATE TABLE IF NOT EXISTS y AS VALUES (1,2,3)")
+ .await?
+ .collect()
+ .await?;
+
+ assert_eq!(result, Vec::new());
+
+ // Create external table
+ ctx.sql("CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/aggregate_simple.csv'")
+ .await?
+ .collect()
+ .await?;
+
+ // Create external table
+ let result = ctx.sql("CREATE EXTERNAL TABLE IF NOT EXISTS aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/aggregate_simple.csv'")
+ .await?
+ .collect()
+ .await?;
+
+ assert_eq!(result, Vec::new());
+
+ Ok(())
+}