You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/06 04:49:02 UTC
[arrow-datafusion] branch master updated: Add CREATE DATABASE command to SQL (#2094)
This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8b09a5c6c Add CREATE DATABASE command to SQL (#2094)
8b09a5c6c is described below
commit 8b09a5c6ce11759d6b0e3009c6245acf54904872
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Wed Apr 6 00:48:57 2022 -0400
Add CREATE DATABASE command to SQL (#2094)
* Initial commit
* Ballista
* Fix proto
* Ballista fix
* Add test
* Add create schema to test
* Parse catalog in schema ref
* Clippy
* Comment updates
---
ballista/rust/core/proto/ballista.proto | 7 +++
ballista/rust/core/src/serde/logical_plan/mod.rs | 31 +++++++++-
datafusion/core/src/execution/context.rs | 72 ++++++++++++++++++++--
datafusion/core/src/logical_plan/mod.rs | 6 +-
datafusion/core/src/logical_plan/plan.rs | 27 +++++++-
.../core/src/optimizer/common_subexpr_eliminate.rs | 1 +
.../core/src/optimizer/projection_push_down.rs | 1 +
datafusion/core/src/optimizer/utils.rs | 3 +-
datafusion/core/src/physical_plan/planner.rs | 9 +++
datafusion/core/src/sql/planner.rs | 11 +++-
10 files changed, 154 insertions(+), 14 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 228c8bee2..b4c63b04b 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -51,6 +51,7 @@ message LogicalPlanNode {
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
+ CreateCatalogNode create_catalog = 20;
}
}
@@ -156,6 +157,12 @@ message CreateCatalogSchemaNode {
datafusion.DfSchema schema = 3;
}
+message CreateCatalogNode {
+ string catalog_name = 1;
+ bool if_not_exists = 2;
+ datafusion.DfSchema schema = 3;
+}
+
// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 490b2ebec..c84f6efa6 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -33,8 +33,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
- Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint,
- Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
+ Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
+ JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
+ Values,
};
use datafusion::prelude::SessionContext;
@@ -344,6 +345,19 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: pb_schema.try_into()?,
}))
}
+ LogicalPlanType::CreateCatalog(create_catalog) => {
+ let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
+ BallistaError::General(String::from(
+ "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
+ ))
+ })?;
+
+ Ok(LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name: create_catalog.catalog_name.clone(),
+ if_not_exists: create_catalog.if_not_exists,
+ schema: pb_schema.try_into()?,
+ }))
+ }
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, ctx, extension_codec)?;
@@ -814,6 +828,19 @@ impl AsLogicalPlan for LogicalPlanNode {
},
)),
}),
+ LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name,
+ if_not_exists,
+ schema: df_schema,
+ }) => Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::CreateCatalog(
+ protobuf::CreateCatalogNode {
+ catalog_name: catalog_name.clone(),
+ if_not_exists: *if_not_exists,
+ schema: Some(df_schema.into()),
+ },
+ )),
+ }),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 04129ccea..2ec133ce5 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -58,8 +58,8 @@ use crate::datasource::listing::ListingTableConfig;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
- CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable,
- FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
+ DropTable, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
@@ -330,14 +330,23 @@ impl SessionContext {
}) => {
// sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA
// so for now, we default to default catalog
- let catalog = self.catalog(DEFAULT_CATALOG).ok_or_else(|| {
+ let tokens: Vec<&str> = schema_name.split('.').collect();
+ let (catalog, schema_name) = match tokens.len() {
+ 1 => Ok((DEFAULT_CATALOG, schema_name.as_str())),
+ 2 => Ok((tokens[0], tokens[1])),
+ _ => Err(DataFusionError::Execution(format!(
+ "Unable to parse catalog from {}",
+ schema_name
+ ))),
+ }?;
+ let catalog = self.catalog(catalog).ok_or_else(|| {
DataFusionError::Execution(format!(
"Missing '{}' catalog",
DEFAULT_CATALOG
))
})?;
- let schema = catalog.schema(&schema_name);
+ let schema = catalog.schema(schema_name);
match (if_not_exists, schema) {
(true, Some(_)) => {
@@ -346,7 +355,7 @@ impl SessionContext {
}
(true, None) | (false, None) => {
let schema = Arc::new(MemorySchemaProvider::new());
- catalog.register_schema(&schema_name, schema)?;
+ catalog.register_schema(schema_name, schema)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
@@ -356,6 +365,33 @@ impl SessionContext {
))),
}
}
+ LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name,
+ if_not_exists,
+ ..
+ }) => {
+ let catalog = self.catalog(catalog_name.as_str());
+
+ match (if_not_exists, catalog) {
+ (true, Some(_)) => {
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (true, None) | (false, None) => {
+ let new_catalog = Arc::new(MemoryCatalogProvider::new());
+ self.state
+ .write()
+ .catalog_list
+ .register_catalog(catalog_name, new_catalog);
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, Some(_)) => Err(DataFusionError::Execution(format!(
+ "Catalog '{:?}' already exists",
+ catalog_name
+ ))),
+ }
+ }
plan => Ok(Arc::new(DataFrame::new(
self.state.clone(),
@@ -3256,6 +3292,32 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn sql_create_catalog() -> Result<()> {
+ // the information schema used to introduce cyclic Arcs
+ let ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ // Create catalog
+ ctx.sql("CREATE DATABASE test").await?.collect().await?;
+
+ // Create schema
+ ctx.sql("CREATE SCHEMA test.abc").await?.collect().await?;
+
+ // Add table to schema
+ ctx.sql("CREATE TABLE test.abc.y AS VALUES (1,2,3)")
+ .await?
+ .collect()
+ .await?;
+
+ // Check table exists in schema
+ let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_catalog='test' AND table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();
+
+ assert_eq!(results[0].num_rows(), 1);
+ Ok(())
+ }
+
#[tokio::test]
async fn normalized_column_identifiers() {
// create local execution context
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index 960deb2cd..48a2f3721 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -63,9 +63,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
- CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
- EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
- PlanVisitor, Repartition, TableScan, Union, Values,
+ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
+ CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan,
+ Partitioning, PlanType, PlanVisitor, Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 327f4d764..88d2af070 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -197,7 +197,18 @@ pub struct CreateExternalTable {
pub struct CreateCatalogSchema {
/// The table schema
pub schema_name: String,
- /// The table name
+ /// Do nothing (except issuing a notice) if a schema with the same name already exists
+ pub if_not_exists: bool,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
+/// Creates a catalog (aka "Database").
+#[derive(Clone)]
+pub struct CreateCatalog {
+ /// The catalog name
+ pub catalog_name: String,
+ /// Do nothing (except issuing a notice) if a schema with the same name already exists
pub if_not_exists: bool,
/// Empty schema
pub schema: DFSchemaRef,
@@ -367,6 +378,8 @@ pub enum LogicalPlan {
CreateMemoryTable(CreateMemoryTable),
/// Creates a new catalog schema.
CreateCatalogSchema(CreateCatalogSchema),
+ /// Creates a new catalog (aka "Database").
+ CreateCatalog(CreateCatalog),
/// Drops a table.
DropTable(DropTable),
/// Values expression. See
@@ -414,6 +427,7 @@ impl LogicalPlan {
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
+ LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}
@@ -456,7 +470,8 @@ impl LogicalPlan {
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
- | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
+ | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
+ | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit(Limit { input, .. })
@@ -512,6 +527,7 @@ impl LogicalPlan {
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
@@ -548,6 +564,7 @@ impl LogicalPlan {
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_) => vec![],
}
}
@@ -701,6 +718,7 @@ impl LogicalPlan {
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_) => true,
};
if !recurse {
@@ -1069,6 +1087,11 @@ impl LogicalPlan {
}) => {
write!(f, "CreateCatalogSchema: {:?}", schema_name)
}
+ LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name, ..
+ }) => {
+ write!(f, "CreateCatalog: {:?}", catalog_name)
+ }
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index b2884ea96..3db1ca4d3 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -222,6 +222,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 0a0e1c025..7f43b5595 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -443,6 +443,7 @@ fn optimize_plan(
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Extension { .. } => {
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index f096cfb5a..c01515d35 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -272,7 +272,8 @@ pub fn from_plan(
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_)
- | LogicalPlan::CreateCatalogSchema(_) => {
+ | LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_) => {
// All of these plan types have no inputs / exprs so should not be called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 4dbaca203..d6d40a740 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -818,6 +818,15 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateCatalogSchema".to_string(),
))
}
+ LogicalPlan::CreateCatalog(_) => {
+ // There is no default plan for "CREATE DATABASE".
+ // It must be handled at a higher level (so
+ // that the schema can be registered with
+ // the context)
+ Err(DataFusionError::Internal(
+ "Unsupported logical plan: CreateCatalog".to_string(),
+ ))
+ }
| LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 770d16872..153c405d6 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -29,7 +29,7 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit,
- normalize_col, union_with_alias, Column, CreateCatalogSchema,
+ normalize_col, union_with_alias, Column, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema,
DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
ToDFSchema, ToStringifiedPlan,
@@ -183,6 +183,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
schema: Arc::new(DFSchema::empty()),
})),
+ Statement::CreateDatabase {
+ db_name,
+ if_not_exists,
+ ..
+ } => Ok(LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name: db_name.to_string(),
+ if_not_exists,
+ schema: Arc::new(DFSchema::empty()),
+ })),
Statement::Drop {
object_type: ObjectType::Table,
if_exists,