You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/06 04:49:02 UTC

[arrow-datafusion] branch master updated: Add CREATE DATABASE command to SQL (#2094)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b09a5c6c Add CREATE DATABASE command to SQL (#2094)
8b09a5c6c is described below

commit 8b09a5c6ce11759d6b0e3009c6245acf54904872
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Wed Apr 6 00:48:57 2022 -0400

    Add CREATE DATABASE command to SQL (#2094)
    
    * Initial commit
    
    * Ballista
    
    * Fix proto
    
    * Ballista fix
    
    * Add test
    
    * Add create schema to test
    
    * Parse catalog in schema ref
    
    * Clippy
    
    * Comment updates
---
 ballista/rust/core/proto/ballista.proto            |  7 +++
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 31 +++++++++-
 datafusion/core/src/execution/context.rs           | 72 ++++++++++++++++++++--
 datafusion/core/src/logical_plan/mod.rs            |  6 +-
 datafusion/core/src/logical_plan/plan.rs           | 27 +++++++-
 .../core/src/optimizer/common_subexpr_eliminate.rs |  1 +
 .../core/src/optimizer/projection_push_down.rs     |  1 +
 datafusion/core/src/optimizer/utils.rs             |  3 +-
 datafusion/core/src/physical_plan/planner.rs       |  9 +++
 datafusion/core/src/sql/planner.rs                 | 11 +++-
 10 files changed, 154 insertions(+), 14 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 228c8bee2..b4c63b04b 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -51,6 +51,7 @@ message LogicalPlanNode {
     LogicalExtensionNode extension = 17;
     CreateCatalogSchemaNode create_catalog_schema = 18;
     UnionNode union = 19;
+    CreateCatalogNode create_catalog = 20;
   }
 }
 
@@ -156,6 +157,12 @@ message CreateCatalogSchemaNode {
   datafusion.DfSchema schema = 3;
 }
 
+message CreateCatalogNode {
+  string catalog_name = 1;
+  bool if_not_exists = 2;
+  datafusion.DfSchema schema = 3;
+}
+
 // a node containing data for defining values list. unlike in SQL where it's two dimensional, here
 // the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
 message ValuesNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 490b2ebec..c84f6efa6 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -33,8 +33,9 @@ use datafusion::logical_plan::plan::{
     Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
 };
 use datafusion::logical_plan::{
-    Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint,
-    Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
+    Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
+    JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
+    Values,
 };
 use datafusion::prelude::SessionContext;
 
@@ -344,6 +345,19 @@ impl AsLogicalPlan for LogicalPlanNode {
                     schema: pb_schema.try_into()?,
                 }))
             }
+            LogicalPlanType::CreateCatalog(create_catalog) => {
+                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
+                    BallistaError::General(String::from(
+                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
+                    ))
+                })?;
+
+                Ok(LogicalPlan::CreateCatalog(CreateCatalog {
+                    catalog_name: create_catalog.catalog_name.clone(),
+                    if_not_exists: create_catalog.if_not_exists,
+                    schema: pb_schema.try_into()?,
+                }))
+            }
             LogicalPlanType::Analyze(analyze) => {
                 let input: LogicalPlan =
                     into_logical_plan!(analyze.input, ctx, extension_codec)?;
@@ -814,6 +828,19 @@ impl AsLogicalPlan for LogicalPlanNode {
                     },
                 )),
             }),
+            LogicalPlan::CreateCatalog(CreateCatalog {
+                catalog_name,
+                if_not_exists,
+                schema: df_schema,
+            }) => Ok(protobuf::LogicalPlanNode {
+                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
+                    protobuf::CreateCatalogNode {
+                        catalog_name: catalog_name.clone(),
+                        if_not_exists: *if_not_exists,
+                        schema: Some(df_schema.into()),
+                    },
+                )),
+            }),
             LogicalPlan::Analyze(a) => {
                 let input = protobuf::LogicalPlanNode::try_from_logical_plan(
                     a.input.as_ref(),
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 04129ccea..2ec133ce5 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -58,8 +58,8 @@ use crate::datasource::listing::ListingTableConfig;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{
-    CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable,
-    FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+    CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
+    DropTable, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
 };
 use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::optimizer::filter_push_down::FilterPushDown;
@@ -330,14 +330,23 @@ impl SessionContext {
             }) => {
                 // sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA
                 // so for now, we default to default catalog
-                let catalog = self.catalog(DEFAULT_CATALOG).ok_or_else(|| {
+                let tokens: Vec<&str> = schema_name.split('.').collect();
+                let (catalog, schema_name) = match tokens.len() {
+                    1 => Ok((DEFAULT_CATALOG, schema_name.as_str())),
+                    2 => Ok((tokens[0], tokens[1])),
+                    _ => Err(DataFusionError::Execution(format!(
+                        "Unable to parse catalog from {}",
+                        schema_name
+                    ))),
+                }?;
+                let catalog = self.catalog(catalog).ok_or_else(|| {
                     DataFusionError::Execution(format!(
                         "Missing '{}' catalog",
                         DEFAULT_CATALOG
                     ))
                 })?;
 
-                let schema = catalog.schema(&schema_name);
+                let schema = catalog.schema(schema_name);
 
                 match (if_not_exists, schema) {
                     (true, Some(_)) => {
@@ -346,7 +355,7 @@ impl SessionContext {
                     }
                     (true, None) | (false, None) => {
                         let schema = Arc::new(MemorySchemaProvider::new());
-                        catalog.register_schema(&schema_name, schema)?;
+                        catalog.register_schema(schema_name, schema)?;
                         let plan = LogicalPlanBuilder::empty(false).build()?;
                         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
                     }
@@ -356,6 +365,33 @@ impl SessionContext {
                     ))),
                 }
             }
+            LogicalPlan::CreateCatalog(CreateCatalog {
+                catalog_name,
+                if_not_exists,
+                ..
+            }) => {
+                let catalog = self.catalog(catalog_name.as_str());
+
+                match (if_not_exists, catalog) {
+                    (true, Some(_)) => {
+                        let plan = LogicalPlanBuilder::empty(false).build()?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (true, None) | (false, None) => {
+                        let new_catalog = Arc::new(MemoryCatalogProvider::new());
+                        self.state
+                            .write()
+                            .catalog_list
+                            .register_catalog(catalog_name, new_catalog);
+                        let plan = LogicalPlanBuilder::empty(false).build()?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (false, Some(_)) => Err(DataFusionError::Execution(format!(
+                        "Catalog '{:?}' already exists",
+                        catalog_name
+                    ))),
+                }
+            }
 
             plan => Ok(Arc::new(DataFrame::new(
                 self.state.clone(),
@@ -3256,6 +3292,32 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn sql_create_catalog() -> Result<()> {
+        // the information schema used to introduce cyclic Arcs
+        let ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        // Create catalog
+        ctx.sql("CREATE DATABASE test").await?.collect().await?;
+
+        // Create schema
+        ctx.sql("CREATE SCHEMA test.abc").await?.collect().await?;
+
+        // Add table to schema
+        ctx.sql("CREATE TABLE test.abc.y AS VALUES (1,2,3)")
+            .await?
+            .collect()
+            .await?;
+
+        // Check table exists in schema
+        let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_catalog='test' AND table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();
+
+        assert_eq!(results[0].num_rows(), 1);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn normalized_column_identifiers() {
         // create local execution context
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index 960deb2cd..48a2f3721 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -63,9 +63,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 pub use extension::UserDefinedLogicalNode;
 pub use operators::Operator;
 pub use plan::{
-    CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
-    EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
-    PlanVisitor, Repartition, TableScan, Union, Values,
+    CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
+    CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan,
+    Partitioning, PlanType, PlanVisitor, Repartition, TableScan, Union, Values,
 };
 pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
 pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 327f4d764..88d2af070 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -197,7 +197,18 @@ pub struct CreateExternalTable {
 pub struct CreateCatalogSchema {
     /// The table schema
     pub schema_name: String,
-    /// The table name
+    /// Do nothing (except issuing a notice) if a schema with the same name already exists
+    pub if_not_exists: bool,
+    /// Empty schema
+    pub schema: DFSchemaRef,
+}
+
+/// Creates a catalog (aka "Database").
+#[derive(Clone)]
+pub struct CreateCatalog {
+    /// The catalog name
+    pub catalog_name: String,
+    /// Do nothing (except issuing a notice) if a schema with the same name already exists
     pub if_not_exists: bool,
     /// Empty schema
     pub schema: DFSchemaRef,
@@ -367,6 +378,8 @@ pub enum LogicalPlan {
     CreateMemoryTable(CreateMemoryTable),
     /// Creates a new catalog schema.
     CreateCatalogSchema(CreateCatalogSchema),
+    /// Creates a new catalog (aka "Database").
+    CreateCatalog(CreateCatalog),
     /// Drops a table.
     DropTable(DropTable),
     /// Values expression. See
@@ -414,6 +427,7 @@ impl LogicalPlan {
             LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
                 schema
             }
+            LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
             LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
         }
     }
@@ -456,7 +470,8 @@ impl LogicalPlan {
             | LogicalPlan::Analyze(Analyze { schema, .. })
             | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
             | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
-            | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
+            | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
+            | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
                 vec![schema]
             }
             LogicalPlan::Limit(Limit { input, .. })
@@ -512,6 +527,7 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::CreateCatalogSchema(_)
+            | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
             | LogicalPlan::CrossJoin(_)
             | LogicalPlan::Analyze { .. }
@@ -548,6 +564,7 @@ impl LogicalPlan {
             | LogicalPlan::Values { .. }
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateCatalogSchema(_)
+            | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_) => vec![],
         }
     }
@@ -701,6 +718,7 @@ impl LogicalPlan {
             | LogicalPlan::Values(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateCatalogSchema(_)
+            | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_) => true,
         };
         if !recurse {
@@ -1069,6 +1087,11 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "CreateCatalogSchema: {:?}", schema_name)
                     }
+                    LogicalPlan::CreateCatalog(CreateCatalog {
+                        catalog_name, ..
+                    }) => {
+                        write!(f, "CreateCatalog: {:?}", catalog_name)
+                    }
                     LogicalPlan::DropTable(DropTable {
                         name, if_exists, ..
                     }) => {
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index b2884ea96..3db1ca4d3 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -222,6 +222,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::Analyze { .. }
         | LogicalPlan::CreateMemoryTable(_)
         | LogicalPlan::CreateCatalogSchema(_)
+        | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 0a0e1c025..7f43b5595 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -443,6 +443,7 @@ fn optimize_plan(
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::CreateMemoryTable(_)
         | LogicalPlan::CreateCatalogSchema(_)
+        | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::CrossJoin(_)
         | LogicalPlan::Extension { .. } => {
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index f096cfb5a..c01515d35 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -272,7 +272,8 @@ pub fn from_plan(
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::DropTable(_)
-        | LogicalPlan::CreateCatalogSchema(_) => {
+        | LogicalPlan::CreateCatalogSchema(_)
+        | LogicalPlan::CreateCatalog(_) => {
             // All of these plan types have no inputs / exprs so should not be called
             assert!(expr.is_empty(), "{:?} should have no exprs", plan);
             assert!(inputs.is_empty(), "{:?}  should have no inputs", plan);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 4dbaca203..d6d40a740 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -818,6 +818,15 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateCatalogSchema".to_string(),
                     ))
                 }
+                LogicalPlan::CreateCatalog(_) => {
+                    // There is no default plan for "CREATE DATABASE".
+                    // It must be handled at a higher level (so
+                    // that the schema can be registered with
+                    // the context)
+                    Err(DataFusionError::Internal(
+                        "Unsupported logical plan: CreateCatalog".to_string(),
+                    ))
+                }
                 | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => {
                     // Create a dummy exec.
                     Ok(Arc::new(EmptyExec::new(
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 770d16872..153c405d6 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -29,7 +29,7 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
     and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit,
-    normalize_col, union_with_alias, Column, CreateCatalogSchema,
+    normalize_col, union_with_alias, Column, CreateCatalog, CreateCatalogSchema,
     CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema,
     DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
     ToDFSchema, ToStringifiedPlan,
@@ -183,6 +183,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 if_not_exists,
                 schema: Arc::new(DFSchema::empty()),
             })),
+            Statement::CreateDatabase {
+                db_name,
+                if_not_exists,
+                ..
+            } => Ok(LogicalPlan::CreateCatalog(CreateCatalog {
+                catalog_name: db_name.to_string(),
+                if_not_exists,
+                schema: Arc::new(DFSchema::empty()),
+            })),
             Statement::Drop {
                 object_type: ObjectType::Table,
                 if_exists,