You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/28 13:38:24 UTC

[arrow-datafusion] branch master updated: Support CREATE OR REPLACE TABLE (#2613)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new df2094ff5 Support CREATE OR REPLACE TABLE (#2613)
df2094ff5 is described below

commit df2094ff5169119cbf93ee47c7a9ab2e629737b5
Author: AssHero <hu...@gmail.com>
AuthorDate: Sat May 28 21:38:19 2022 +0800

    Support CREATE OR REPLACE TABLE (#2613)
    
    * support create or replace table ...
    
    * 1. add test case for 'create or replace table'. 2. do not allow 'IF NOT EXISTS' coexist with 'REPLACE'
    
    * refine the code format
---
 datafusion/core/src/execution/context.rs | 27 +++++++++++++++---
 datafusion/core/tests/sql/create_drop.rs | 47 ++++++++++++++++++++++++++++++++
 datafusion/expr/src/logical_plan/plan.rs |  2 ++
 datafusion/expr/src/utils.rs             |  2 ++
 datafusion/sql/src/planner.rs            |  2 ++
 5 files changed, 76 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 272cdc6da..652834d73 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -290,15 +290,17 @@ impl SessionContext {
                 name,
                 input,
                 if_not_exists,
+                or_replace,
             }) => {
                 let table = self.table(name.as_str());
 
-                match (if_not_exists, table) {
-                    (true, Ok(_)) => {
+                match (if_not_exists, or_replace, table) {
+                    (true, false, Ok(_)) => {
                         let plan = LogicalPlanBuilder::empty(false).build()?;
                         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
                     }
-                    (_, Err(_)) => {
+                    (false, true, Ok(_)) => {
+                        self.deregister_table(name.as_str())?;
                         let plan = self.optimize(&input)?;
                         let physical =
                             Arc::new(DataFrame::new(self.state.clone(), &plan));
@@ -312,7 +314,24 @@ impl SessionContext {
                         self.register_table(name.as_str(), table)?;
                         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
                     }
-                    (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+                    (true, true, Ok(_)) => Err(DataFusionError::Internal(
+                        "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
+                    )),
+                    (_, _, Err(_)) => {
+                        let plan = self.optimize(&input)?;
+                        let physical =
+                            Arc::new(DataFrame::new(self.state.clone(), &plan));
+
+                        let batches: Vec<_> = physical.collect_partitioned().await?;
+                        let table = Arc::new(MemTable::try_new(
+                            Arc::new(plan.schema().as_ref().into()),
+                            batches,
+                        )?);
+
+                        self.register_table(name.as_str(), table)?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
                         "Table '{:?}' already exists",
                         name
                     ))),
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 59df5d404..1d28f2e33 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -45,6 +45,53 @@ async fn create_table_as() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn create_or_replace_table_as() -> Result<()> {
+    // the information schema used to introduce cyclic Arcs
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    // Create table
+    ctx.sql("CREATE TABLE y AS VALUES (1,2),(3,4)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    // Replace table
+    ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let sql_all = "SELECT * FROM y";
+    let results_all = execute_to_batches(&ctx, sql_all).await;
+
+    let expected = vec![
+        "+---------+---------+",
+        "| column1 | column2 |",
+        "+---------+---------+",
+        "| 5       | 6       |",
+        "+---------+---------+",
+    ];
+
+    assert_batches_eq!(expected, &results_all);
+
+    // 'IF NOT EXISTS' cannot coexist with 'REPLACE'
+    let result = ctx
+        .sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)")
+        .await;
+    assert!(
+        result.is_err(),
+        "'IF NOT EXISTS' cannot coexist with 'REPLACE'"
+    );
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn drop_table() -> Result<()> {
     let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index acdb90920..ea8075bf2 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1063,6 +1063,8 @@ pub struct CreateMemoryTable {
     pub input: Arc<LogicalPlan>,
     /// Option to not error if table already exists
     pub if_not_exists: bool,
+    /// Option to replace table content if table already exists
+    pub or_replace: bool,
 }
 
 /// Creates a view.
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index bb81d6776..483b12b49 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -427,11 +427,13 @@ pub fn from_plan(
         LogicalPlan::CreateMemoryTable(CreateMemoryTable {
             name,
             if_not_exists,
+            or_replace,
             ..
         }) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
             input: Arc::new(inputs[0].clone()),
             name: name.clone(),
             if_not_exists: *if_not_exists,
+            or_replace: *or_replace,
         })),
         LogicalPlan::CreateView(CreateView {
             name, or_replace, ..
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 3556b0203..3feb870ba 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -161,6 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 table_properties,
                 with_options,
                 if_not_exists,
+                or_replace,
                 ..
             } if columns.is_empty()
                 && constraints.is_empty()
@@ -173,6 +174,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     name: name.to_string(),
                     input: Arc::new(plan),
                     if_not_exists,
+                    or_replace,
                 }))
             }
             Statement::CreateView {