You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/28 13:38:24 UTC
[arrow-datafusion] branch master updated: Support CREATE OR REPLACE TABLE (#2613)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new df2094ff5 Support CREATE OR REPLACE TABLE (#2613)
df2094ff5 is described below
commit df2094ff5169119cbf93ee47c7a9ab2e629737b5
Author: AssHero <hu...@gmail.com>
AuthorDate: Sat May 28 21:38:19 2022 +0800
Support CREATE OR REPLACE TABLE (#2613)
* support create or replace table ...
* 1. add test case for 'create or replace table'. 2. do not allow 'IF NOT EXISTS' coexist with 'REPLACE'
* refine the code format
---
datafusion/core/src/execution/context.rs | 27 +++++++++++++++---
datafusion/core/tests/sql/create_drop.rs | 47 ++++++++++++++++++++++++++++++++
datafusion/expr/src/logical_plan/plan.rs | 2 ++
datafusion/expr/src/utils.rs | 2 ++
datafusion/sql/src/planner.rs | 2 ++
5 files changed, 76 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 272cdc6da..652834d73 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -290,15 +290,17 @@ impl SessionContext {
name,
input,
if_not_exists,
+ or_replace,
}) => {
let table = self.table(name.as_str());
- match (if_not_exists, table) {
- (true, Ok(_)) => {
+ match (if_not_exists, or_replace, table) {
+ (true, false, Ok(_)) => {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
- (_, Err(_)) => {
+ (false, true, Ok(_)) => {
+ self.deregister_table(name.as_str())?;
let plan = self.optimize(&input)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &plan));
@@ -312,7 +314,24 @@ impl SessionContext {
self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
- (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+ (true, true, Ok(_)) => Err(DataFusionError::Internal(
+ "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(),
+ )),
+ (_, _, Err(_)) => {
+ let plan = self.optimize(&input)?;
+ let physical =
+ Arc::new(DataFrame::new(self.state.clone(), &plan));
+
+ let batches: Vec<_> = physical.collect_partitioned().await?;
+ let table = Arc::new(MemTable::try_new(
+ Arc::new(plan.schema().as_ref().into()),
+ batches,
+ )?);
+
+ self.register_table(name.as_str(), table)?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
))),
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 59df5d404..1d28f2e33 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -45,6 +45,53 @@ async fn create_table_as() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn create_or_replace_table_as() -> Result<()> {
+ // the information schema used to introduce cyclic Arcs
+ let ctx =
+ SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+ // Create table
+ ctx.sql("CREATE TABLE y AS VALUES (1,2),(3,4)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // Replace table
+ ctx.sql("CREATE OR REPLACE TABLE y AS VALUES (5,6)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let sql_all = "SELECT * FROM y";
+ let results_all = execute_to_batches(&ctx, sql_all).await;
+
+ let expected = vec![
+ "+---------+---------+",
+ "| column1 | column2 |",
+ "+---------+---------+",
+ "| 5 | 6 |",
+ "+---------+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results_all);
+
+ // 'IF NOT EXISTS' cannot coexist with 'REPLACE'
+ let result = ctx
+ .sql("CREATE OR REPLACE TABLE if not exists y AS VALUES (7,8)")
+ .await;
+ assert!(
+ result.is_err(),
+ "'IF NOT EXISTS' cannot coexist with 'REPLACE'"
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn drop_table() -> Result<()> {
let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index acdb90920..ea8075bf2 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1063,6 +1063,8 @@ pub struct CreateMemoryTable {
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
pub if_not_exists: bool,
+ /// Option to replace table content if table already exists
+ pub or_replace: bool,
}
/// Creates a view.
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index bb81d6776..483b12b49 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -427,11 +427,13 @@ pub fn from_plan(
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
if_not_exists,
+ or_replace,
..
}) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
name: name.clone(),
if_not_exists: *if_not_exists,
+ or_replace: *or_replace,
})),
LogicalPlan::CreateView(CreateView {
name, or_replace, ..
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 3556b0203..3feb870ba 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -161,6 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
table_properties,
with_options,
if_not_exists,
+ or_replace,
..
} if columns.is_empty()
&& constraints.is_empty()
@@ -173,6 +174,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
name: name.to_string(),
input: Arc::new(plan),
if_not_exists,
+ or_replace,
}))
}
Statement::CreateView {