You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/05 18:01:41 UTC

[arrow-datafusion] branch master updated: Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 823011590 Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)
823011590 is described below

commit 823011590f4509179806a8f8b02aef5bdcbfaec8
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Tue Apr 5 14:01:36 2022 -0400

    Add IF NOT EXISTS to `CREATE TABLE` and `CREATE EXTERNAL TABLE` (#2143)
    
    * Add IF NOT EXISTS to CREATE EXTERNAL TABLE
    
    * Add test
    
    * ine for create table
    
    * Add sql tests
    
    * Clippy
---
 ballista/rust/client/src/context.rs              | 88 ++++++++++++---------
 ballista/rust/core/proto/ballista.proto          |  1 +
 ballista/rust/core/src/serde/logical_plan/mod.rs |  4 +
 datafusion/core/src/execution/context.rs         | 97 ++++++++++++++++--------
 datafusion/core/src/logical_plan/plan.rs         |  4 +
 datafusion/core/src/optimizer/utils.rs           | 15 ++--
 datafusion/core/src/sql/parser.rs                | 26 +++++++
 datafusion/core/src/sql/planner.rs               |  4 +
 datafusion/core/tests/sql/create_drop.rs         | 65 ++++++++++++++++
 9 files changed, 230 insertions(+), 74 deletions(-)

diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 0a002e888..451eac25a 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -376,44 +376,58 @@ impl BallistaContext {
                 ref file_type,
                 ref has_header,
                 ref table_partition_cols,
-            }) => match file_type {
-                FileType::CSV => {
-                    self.register_csv(
-                        name,
-                        location,
-                        CsvReadOptions::new()
-                            .schema(&schema.as_ref().to_owned().into())
-                            .has_header(*has_header)
-                            .table_partition_cols(table_partition_cols.to_vec()),
-                    )
-                    .await?;
-                    Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
-                }
-                FileType::Parquet => {
-                    self.register_parquet(
-                        name,
-                        location,
-                        ParquetReadOptions::default()
-                            .table_partition_cols(table_partition_cols.to_vec()),
-                    )
-                    .await?;
-                    Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+                ref if_not_exists,
+            }) => {
+                let table_exists = ctx.table_exist(name.as_str())?;
+
+                match (if_not_exists, table_exists) {
+                    (_, false) => match file_type {
+                        FileType::CSV => {
+                            self.register_csv(
+                                name,
+                                location,
+                                CsvReadOptions::new()
+                                    .schema(&schema.as_ref().to_owned().into())
+                                    .has_header(*has_header)
+                                    .table_partition_cols(table_partition_cols.to_vec()),
+                            )
+                            .await?;
+                            Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+                        }
+                        FileType::Parquet => {
+                            self.register_parquet(
+                                name,
+                                location,
+                                ParquetReadOptions::default()
+                                    .table_partition_cols(table_partition_cols.to_vec()),
+                            )
+                            .await?;
+                            Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+                        }
+                        FileType::Avro => {
+                            self.register_avro(
+                                name,
+                                location,
+                                AvroReadOptions::default()
+                                    .table_partition_cols(table_partition_cols.to_vec()),
+                            )
+                            .await?;
+                            Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+                        }
+                        _ => Err(DataFusionError::NotImplemented(format!(
+                            "Unsupported file type {:?}.",
+                            file_type
+                        ))),
+                    },
+                    (true, true) => {
+                        Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
+                    }
+                    (false, true) => Err(DataFusionError::Execution(format!(
+                        "Table '{:?}' already exists",
+                        name
+                    ))),
                 }
-                FileType::Avro => {
-                    self.register_avro(
-                        name,
-                        location,
-                        AvroReadOptions::default()
-                            .table_partition_cols(table_partition_cols.to_vec()),
-                    )
-                    .await?;
-                    Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
-                }
-                _ => Err(DataFusionError::NotImplemented(format!(
-                    "Unsupported file type {:?}.",
-                    file_type
-                ))),
-            },
+            }
 
             _ => ctx.sql(sql).await,
         }
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 7f96a00ea..228c8bee2 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -147,6 +147,7 @@ message CreateExternalTableNode {
   bool has_header = 4;
   datafusion.DfSchema schema = 5;
   repeated string table_partition_cols = 6;
+  bool if_not_exists = 7;
 }
 
 message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 3aed2db00..490b2ebec 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -328,6 +328,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     table_partition_cols: create_extern_table
                         .table_partition_cols
                         .clone(),
+                    if_not_exists: create_extern_table.if_not_exists,
                 }))
             }
             LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
@@ -775,6 +776,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 has_header,
                 schema: df_schema,
                 table_partition_cols,
+                if_not_exists,
             }) => {
                 use datafusion::sql::parser::FileType;
 
@@ -794,6 +796,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             has_header: *has_header,
                             schema: Some(df_schema.into()),
                             table_partition_cols: table_partition_cols.clone(),
+                            if_not_exists: *if_not_exists,
                         },
                     )),
                 })
@@ -1129,6 +1132,7 @@ mod roundtrip_tests {
                     file_type: *file,
                     has_header: true,
                     table_partition_cols: vec![],
+                    if_not_exists: false,
                 });
 
             roundtrip_test!(create_table_node);
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ab65d28f7..04129ccea 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -218,6 +218,7 @@ impl SessionContext {
                 ref file_type,
                 ref has_header,
                 ref table_partition_cols,
+                ref if_not_exists,
             }) => {
                 let (file_format, file_extension) = match file_type {
                     FileType::CSV => (
@@ -238,40 +239,74 @@ impl SessionContext {
                         DEFAULT_JSON_EXTENSION,
                     ),
                 };
-
-                let options = ListingOptions {
-                    format: file_format,
-                    collect_stat: false,
-                    file_extension: file_extension.to_owned(),
-                    target_partitions: self.copied_config().target_partitions,
-                    table_partition_cols: table_partition_cols.clone(),
-                };
-
-                // TODO make schema in CreateExternalTable optional instead of empty
-                let provided_schema = if schema.fields().is_empty() {
-                    None
-                } else {
-                    Some(Arc::new(schema.as_ref().to_owned().into()))
-                };
-                self.register_listing_table(name, location, options, provided_schema)
-                    .await?;
-                let plan = LogicalPlanBuilder::empty(false).build()?;
-                Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                let table = self.table(name.as_str());
+                match (if_not_exists, table) {
+                    (true, Ok(_)) => {
+                        let plan = LogicalPlanBuilder::empty(false).build()?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (_, Err(_)) => {
+                        // TODO make schema in CreateExternalTable optional instead of empty
+                        let provided_schema = if schema.fields().is_empty() {
+                            None
+                        } else {
+                            Some(Arc::new(schema.as_ref().to_owned().into()))
+                        };
+                        let options = ListingOptions {
+                            format: file_format,
+                            collect_stat: false,
+                            file_extension: file_extension.to_owned(),
+                            target_partitions: self.copied_config().target_partitions,
+                            table_partition_cols: table_partition_cols.clone(),
+                        };
+                        self.register_listing_table(
+                            name,
+                            location,
+                            options,
+                            provided_schema,
+                        )
+                        .await?;
+                        let plan = LogicalPlanBuilder::empty(false).build()?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+                        "Table '{:?}' already exists",
+                        name
+                    ))),
+                }
             }
 
-            LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, input }) => {
-                let plan = self.optimize(&input)?;
-                let physical = Arc::new(DataFrame::new(self.state.clone(), &plan));
-
-                let batches: Vec<_> = physical.collect_partitioned().await?;
-                let table = Arc::new(MemTable::try_new(
-                    Arc::new(plan.schema().as_ref().into()),
-                    batches,
-                )?);
-                self.register_table(name.as_str(), table)?;
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+                name,
+                input,
+                if_not_exists,
+            }) => {
+                let table = self.table(name.as_str());
 
-                let plan = LogicalPlanBuilder::empty(false).build()?;
-                Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                match (if_not_exists, table) {
+                    (true, Ok(_)) => {
+                        let plan = LogicalPlanBuilder::empty(false).build()?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (_, Err(_)) => {
+                        let plan = self.optimize(&input)?;
+                        let physical =
+                            Arc::new(DataFrame::new(self.state.clone(), &plan));
+
+                        let batches: Vec<_> = physical.collect_partitioned().await?;
+                        let table = Arc::new(MemTable::try_new(
+                            Arc::new(plan.schema().as_ref().into()),
+                            batches,
+                        )?);
+
+                        self.register_table(name.as_str(), table)?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+                        "Table '{:?}' already exists",
+                        name
+                    ))),
+                }
             }
 
             LogicalPlan::DropTable(DropTable {
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index eeb06d559..327f4d764 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -169,6 +169,8 @@ pub struct CreateMemoryTable {
     pub name: String,
     /// The logical plan
     pub input: Arc<LogicalPlan>,
+    /// Option to not error if table already exists
+    pub if_not_exists: bool,
 }
 
 /// Creates an external table.
@@ -186,6 +188,8 @@ pub struct CreateExternalTable {
     pub has_header: bool,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
+    /// Option to not error if table already exists
+    pub if_not_exists: bool,
 }
 
 /// Creates a schema.
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 14e5ffa8c..f096cfb5a 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -226,12 +226,15 @@ pub fn from_plan(
             n: *n,
             input: Arc::new(inputs[0].clone()),
         })),
-        LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, .. }) => {
-            Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
-                input: Arc::new(inputs[0].clone()),
-                name: name.clone(),
-            }))
-        }
+        LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+            name,
+            if_not_exists,
+            ..
+        }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+            input: Arc::new(inputs[0].clone()),
+            name: name.clone(),
+            if_not_exists: *if_not_exists,
+        })),
         LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
             node: e.node.from_template(expr, inputs),
         })),
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 566e91c92..29ad9f8cd 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -80,6 +80,8 @@ pub struct CreateExternalTable {
     pub location: String,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
+    /// Option to not error if table already exists
+    pub if_not_exists: bool,
 }
 
 /// DataFusion Statement representations.
@@ -298,6 +300,9 @@ impl<'a> DFParser<'a> {
 
     fn parse_create_external_table(&mut self) -> Result<Statement, ParserError> {
         self.parser.expect_keyword(Keyword::TABLE)?;
+        let if_not_exists =
+            self.parser
+                .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
         let table_name = self.parser.parse_object_name()?;
         let (columns, _) = self.parse_columns()?;
         self.parser
@@ -324,6 +329,7 @@ impl<'a> DFParser<'a> {
             has_header,
             location,
             table_partition_cols,
+            if_not_exists,
         };
         Ok(Statement::CreateExternalTable(create))
     }
@@ -420,6 +426,7 @@ mod tests {
             has_header: false,
             location: "foo.csv".into(),
             table_partition_cols: vec![],
+            if_not_exists: false,
         });
         expect_parse_ok(sql, expected)?;
 
@@ -433,6 +440,7 @@ mod tests {
             has_header: false,
             location: "foo.csv".into(),
             table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
+            if_not_exists: false,
         });
         expect_parse_ok(sql, expected)?;
 
@@ -449,6 +457,7 @@ mod tests {
                 has_header: true,
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
+                if_not_exists: false,
             });
             expect_parse_ok(sql, expected)?;
         }
@@ -462,6 +471,7 @@ mod tests {
             has_header: false,
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
+            if_not_exists: false,
         });
         expect_parse_ok(sql, expected)?;
 
@@ -474,6 +484,7 @@ mod tests {
             has_header: false,
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
+            if_not_exists: false,
         });
         expect_parse_ok(sql, expected)?;
 
@@ -486,6 +497,21 @@ mod tests {
             has_header: false,
             location: "foo.avro".into(),
             table_partition_cols: vec![],
+            if_not_exists: false,
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case: it is ok for avro files not to have columns specified
+        let sql =
+            "CREATE EXTERNAL TABLE IF NOT EXISTS t STORED AS PARQUET LOCATION 'foo.parquet'";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![],
+            file_type: FileType::Parquet,
+            has_header: false,
+            location: "foo.parquet".into(),
+            table_partition_cols: vec![],
+            if_not_exists: true,
         });
         expect_parse_ok(sql, expected)?;
 
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index b59444ed2..770d16872 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -156,6 +156,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 constraints,
                 table_properties,
                 with_options,
+                if_not_exists,
                 ..
             } if columns.is_empty()
                 && constraints.is_empty()
@@ -167,6 +168,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
                     name: name.to_string(),
                     input: Arc::new(plan),
+                    if_not_exists,
                 }))
             }
             Statement::CreateTable { .. } => Err(DataFusionError::NotImplemented(
@@ -309,6 +311,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             has_header,
             location,
             table_partition_cols,
+            if_not_exists,
         } = statement;
 
         // semantic checks
@@ -335,6 +338,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             file_type,
             has_header,
             table_partition_cols,
+            if_not_exists,
         }))
     }
 
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 748bd7ae9..6b1769751 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -126,3 +126,68 @@ async fn create_external_table_with_timestamps() {
     ];
     assert_batches_sorted_eq!(expected, &result);
 }
+
+#[tokio::test]
+#[should_panic(expected = "already exists")]
+async fn sql_create_duplicate_table() {
+    // the information schema used to introduce cyclic Arcs
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    // Create table
+    ctx.sql("CREATE TABLE y AS VALUES (1,2,3)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    // Create table again
+    let result = ctx
+        .sql("CREATE TABLE y AS VALUES (1,2,3)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    assert_eq!(result, Vec::new());
+}
+
+#[tokio::test]
+async fn sql_create_table_if_not_exists() -> Result<()> {
+    // the information schema used to introduce cyclic Arcs
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    // Create table
+    ctx.sql("CREATE TABLE y AS VALUES (1,2,3)")
+        .await?
+        .collect()
+        .await?;
+
+    // Create table again
+    let result = ctx
+        .sql("CREATE TABLE IF NOT EXISTS y AS VALUES (1,2,3)")
+        .await?
+        .collect()
+        .await?;
+
+    assert_eq!(result, Vec::new());
+
+    // Create external table
+    ctx.sql("CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/aggregate_simple.csv'")
+        .await?
+        .collect()
+        .await?;
+
+    // Create external table
+    let result = ctx.sql("CREATE EXTERNAL TABLE IF NOT EXISTS aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/aggregate_simple.csv'")
+        .await?
+        .collect()
+        .await?;
+
+    assert_eq!(result, Vec::new());
+
+    Ok(())
+}