You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/30 16:03:26 UTC

[arrow-datafusion] branch main updated: drop schema refactored (#6096)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a015798a3a drop schema refactored (#6096)
a015798a3a is described below

commit a015798a3aa7fc8543e13022f2a17be23b8afba4
Author: Jay Miller <37...@users.noreply.github.com>
AuthorDate: Sun Apr 30 12:03:19 2023 -0400

    drop schema refactored (#6096)
---
 datafusion/common/src/lib.rs                       |  2 +
 datafusion/common/src/schema_reference.rs          | 64 +++++++++++++++++
 datafusion/core/src/catalog/catalog.rs             | 83 +++++++++++++++++++++-
 datafusion/core/src/execution/context.rs           | 47 +++++++++++-
 .../core/tests/sqllogictests/test_files/ddl.slt    | 22 ++++++
 .../test_files/information_schema.slt              |  2 +-
 datafusion/expr/src/logical_plan/ddl.rs            | 28 +++++++-
 datafusion/expr/src/logical_plan/mod.rs            |  2 +-
 datafusion/proto/src/logical_plan/mod.rs           |  3 +
 datafusion/sql/src/statement.rs                    | 24 +++++--
 10 files changed, 264 insertions(+), 13 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 1b59f43a67..7bbb3fbb71 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -26,6 +26,7 @@ pub mod parsers;
 #[cfg(feature = "pyarrow")]
 mod pyarrow;
 pub mod scalar;
+mod schema_reference;
 pub mod stats;
 mod table_reference;
 pub mod test_util;
@@ -39,6 +40,7 @@ pub use error::{
     SharedResult,
 };
 pub use scalar::{ScalarType, ScalarValue};
+pub use schema_reference::{OwnedSchemaReference, SchemaReference};
 pub use stats::{ColumnStatistics, Statistics};
 pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
 
diff --git a/datafusion/common/src/schema_reference.rs b/datafusion/common/src/schema_reference.rs
new file mode 100644
index 0000000000..a7dd49e5c3
--- /dev/null
+++ b/datafusion/common/src/schema_reference.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub enum SchemaReference<'a> {
+    Bare {
+        schema: Cow<'a, str>,
+    },
+    Full {
+        schema: Cow<'a, str>,
+        catalog: Cow<'a, str>,
+    },
+}
+
+impl SchemaReference<'_> {
+    /// Get only the schema name that this references.
+    pub fn schema_name(&self) -> &str {
+        match self {
+            SchemaReference::Bare { schema } => schema,
+            SchemaReference::Full { schema, catalog: _ } => schema,
+        }
+    }
+}
+
+pub type OwnedSchemaReference = SchemaReference<'static>;
+
+impl std::fmt::Display for SchemaReference<'_> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::Bare { schema } => write!(f, "{schema}"),
+            Self::Full { schema, catalog } => write!(f, "{catalog}.{schema}"),
+        }
+    }
+}
+
+impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> {
+    fn from(value: &'a OwnedSchemaReference) -> Self {
+        match value {
+            SchemaReference::Bare { schema } => SchemaReference::Bare {
+                schema: Cow::Borrowed(schema),
+            },
+            SchemaReference::Full { schema, catalog } => SchemaReference::Full {
+                schema: Cow::Borrowed(schema),
+                catalog: Cow::Borrowed(catalog),
+            },
+        }
+    }
+}
diff --git a/datafusion/core/src/catalog/catalog.rs b/datafusion/core/src/catalog/catalog.rs
index ccec8bd25e..393d98dcb8 100644
--- a/datafusion/core/src/catalog/catalog.rs
+++ b/datafusion/core/src/catalog/catalog.rs
@@ -124,6 +124,26 @@ pub trait CatalogProvider: Sync + Send {
             "Registering new schemas is not supported".to_string(),
         ))
     }
+
+    /// Removes a schema from this catalog. Implementations of this method should return
+    /// errors if the schema exists but cannot be dropped. For example, in DataFusion's
+    /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
+    /// will only be successfully dropped when `cascade` is true.
+    /// This is equivalent to how DROP SCHEMA works in PostgreSQL.
+    ///
+    /// Implementations of this method should return None if schema with `name`
+    /// does not exist.
+    ///
+    /// By default returns a "Not Implemented" error
+    fn deregister_schema(
+        &self,
+        _name: &str,
+        _cascade: bool,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        Err(DataFusionError::NotImplemented(
+            "Deregistering new schemas is not supported".to_string(),
+        ))
+    }
 }
 
 /// Simple in-memory implementation of a catalog.
@@ -160,13 +180,38 @@ impl CatalogProvider for MemoryCatalogProvider {
     ) -> Result<Option<Arc<dyn SchemaProvider>>> {
         Ok(self.schemas.insert(name.into(), schema))
     }
+
+    fn deregister_schema(
+        &self,
+        name: &str,
+        cascade: bool,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        if let Some(schema) = self.schema(name) {
+            let table_names = schema.table_names();
+            match (table_names.is_empty(), cascade) {
+                (true, _) | (false, true) => {
+                    let (_, removed) = self.schemas.remove(name).unwrap();
+                    Ok(Some(removed))
+                }
+                (false, false) => Err(DataFusionError::Execution(format!(
+                    "Cannot drop schema {} because other tables depend on it: {}",
+                    name,
+                    itertools::join(table_names.iter(), ", ")
+                ))),
+            }
+        } else {
+            Ok(None)
+        }
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::catalog::schema::MemorySchemaProvider;
-
     use super::*;
+    use crate::catalog::schema::MemorySchemaProvider;
+    use crate::datasource::empty::EmptyTable;
+    use crate::datasource::TableProvider;
+    use arrow::datatypes::Schema;
 
     #[test]
     fn default_register_schema_not_supported() {
@@ -194,4 +239,38 @@ mod tests {
             Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
         };
     }
+
+    #[test]
+    fn memory_catalog_dereg_nonempty_schema() {
+        let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+
+        let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
+        let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
+            as Arc<dyn TableProvider>;
+        schema.register_table("t".into(), test_table).unwrap();
+
+        cat.register_schema("foo", schema.clone()).unwrap();
+
+        assert!(
+            cat.deregister_schema("foo", false).is_err(),
+            "dropping empty schema without cascade should error"
+        );
+        assert!(cat.deregister_schema("foo", true).unwrap().is_some());
+    }
+
+    #[test]
+    fn memory_catalog_dereg_empty_schema() {
+        let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+
+        let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
+        cat.register_schema("foo", schema.clone()).unwrap();
+
+        assert!(cat.deregister_schema("foo", false).unwrap().is_some());
+    }
+
+    #[test]
+    fn memory_catalog_dereg_missing() {
+        let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+        assert!(cat.deregister_schema("foo", false).unwrap().is_none());
+    }
 }
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ee8cfe5dc5..d76e26e1b7 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -65,8 +65,8 @@ use crate::datasource::{
 use crate::error::{DataFusionError, Result};
 use crate::logical_expr::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
-    CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
-    SetVariable, TableSource, TableType, UNNAMED_TABLE,
+    CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan,
+    LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
 };
 use crate::optimizer::OptimizerRule;
 use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
@@ -86,7 +86,7 @@ use crate::physical_plan::PhysicalPlanner;
 use crate::variable::{VarProvider, VarType};
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
-use datafusion_common::OwnedTableReference;
+use datafusion_common::{OwnedTableReference, SchemaReference};
 use datafusion_sql::{
     parser::DFParser,
     planner::{ContextProvider, SqlToRel},
@@ -383,6 +383,7 @@ impl SessionContext {
                 DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
                 DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
                 DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
+                DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
             },
             // TODO what about the other statements (like TransactionStart and TransactionEnd)
             LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
@@ -653,6 +654,46 @@ impl SessionContext {
         }
     }
 
+    async fn drop_schema(&self, cmd: DropCatalogSchema) -> Result<DataFrame> {
+        let DropCatalogSchema {
+            name,
+            if_exists: allow_missing,
+            cascade,
+            schema: _,
+        } = cmd;
+        let catalog = {
+            let state = self.state.read();
+            let catalog_name = match &name {
+                SchemaReference::Full { catalog, .. } => catalog.to_string(),
+                SchemaReference::Bare { .. } => {
+                    state.config_options().catalog.default_catalog.to_string()
+                }
+            };
+            if let Some(catalog) = state.catalog_list.catalog(&catalog_name) {
+                catalog
+            } else if allow_missing {
+                return self.return_empty_dataframe();
+            } else {
+                return self.schema_doesnt_exist_err(name);
+            }
+        };
+        let dereg = catalog.deregister_schema(name.schema_name(), cascade)?;
+        match (dereg, allow_missing) {
+            (None, true) => self.return_empty_dataframe(),
+            (None, false) => self.schema_doesnt_exist_err(name),
+            (Some(_), _) => self.return_empty_dataframe(),
+        }
+    }
+
+    fn schema_doesnt_exist_err(
+        &self,
+        schemaref: SchemaReference<'_>,
+    ) -> Result<DataFrame> {
+        Err(DataFusionError::Execution(format!(
+            "Schema '{schemaref}' doesn't exist."
+        )))
+    }
+
     async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
         let SetVariable {
             variable, value, ..
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 49ec168d70..856994881f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -643,3 +643,25 @@ select * from t;
 
 statement ok
 drop table t;
+
+##########
+# Dropping schemas
+##########
+
+statement error DataFusion error: Execution error: Cannot drop schema foo_schema because other tables depend on it: bar
+DROP SCHEMA foo_schema;
+
+statement ok
+DROP SCHEMA foo_schema CASCADE;
+
+statement error DataFusion error: Execution error: Schema 'doesnt_exist' doesn't exist.
+DROP SCHEMA doesnt_exist;
+
+statement ok
+DROP SCHEMA IF EXISTS doesnt_exist;
+
+statement ok
+CREATE SCHEMA empty_schema;
+
+statement ok
+DROP SCHEMA empty_schema;
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 80187564f9..3ea42583c8 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -375,7 +375,7 @@ SHOW CREATE TABLE test.xyz
 ----
 datafusion test xyz CREATE VIEW test.xyz AS SELECT * FROM abc
 
-statement error DataFusion error: This feature is not implemented: Only `DROP TABLE/VIEW
+statement error DataFusion error: Execution error: Cannot drop schema test because other tables depend on it: xyz
 DROP SCHEMA test;
 
 statement ok
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index 9f87417281..eeaa1c5ddb 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::Column;
 use datafusion_common::{
     parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
 };
+use datafusion_common::{Column, OwnedSchemaReference};
 use std::collections::HashMap;
 use std::sync::Arc;
 use std::{
@@ -45,6 +45,8 @@ pub enum DdlStatement {
     DropTable(DropTable),
     /// Drops a view.
     DropView(DropView),
+    /// Drops a catalog schema
+    DropCatalogSchema(DropCatalogSchema),
 }
 
 impl DdlStatement {
@@ -62,6 +64,7 @@ impl DdlStatement {
             DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
             DdlStatement::DropTable(DropTable { schema, .. }) => schema,
             DdlStatement::DropView(DropView { schema, .. }) => schema,
+            DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
         }
     }
 
@@ -76,6 +79,7 @@ impl DdlStatement {
             DdlStatement::CreateCatalog(_) => "CreateCatalog",
             DdlStatement::DropTable(_) => "DropTable",
             DdlStatement::DropView(_) => "DropView",
+            DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
         }
     }
 
@@ -91,6 +95,7 @@ impl DdlStatement {
             DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
             DdlStatement::DropTable(_) => vec![],
             DdlStatement::DropView(_) => vec![],
+            DdlStatement::DropCatalogSchema(_) => vec![],
         }
     }
 
@@ -147,6 +152,14 @@ impl DdlStatement {
                     }) => {
                         write!(f, "DropView: {name:?} if not exist:={if_exists}")
                     }
+                    DdlStatement::DropCatalogSchema(DropCatalogSchema {
+                        name,
+                        if_exists,
+                        cascade,
+                        ..
+                    }) => {
+                        write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
+                    }
                 }
             }
         }
@@ -273,3 +286,16 @@ pub struct DropView {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+/// Drops a schema
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DropCatalogSchema {
+    /// The schema name
+    pub name: OwnedSchemaReference,
+    /// If the schema exists
+    pub if_exists: bool,
+    /// Whether drop should cascade
+    pub cascade: bool,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 6a0249320e..01862c3d54 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -29,7 +29,7 @@ pub use builder::{
 };
 pub use ddl::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
-    CreateView, DdlStatement, DropTable, DropView,
+    CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView,
 };
 pub use dml::{DmlStatement, WriteOp};
 pub use plan::{
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 709c391772..8789b7302e 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1372,6 +1372,9 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::Ddl(DdlStatement::DropView(_)) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DropView",
             )),
+            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
+                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
+            )),
             LogicalPlan::Statement(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for Statement",
             )),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index b48261dcc4..779a79e155 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -26,7 +26,7 @@ use arrow_schema::DataType;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
     Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
-    OwnedTableReference, Result, TableReference, ToDFSchema,
+    OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema,
 };
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
 use datafusion_expr::logical_plan::builder::project;
@@ -35,8 +35,8 @@ use datafusion_expr::utils::expr_to_columns;
 use datafusion_expr::{
     cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
     CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
-    DescribeTable, DmlStatement, DropTable, DropView, EmptyRelation, Explain,
-    ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
+    DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, EmptyRelation,
+    Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
     SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
     TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
     WriteOp,
@@ -242,7 +242,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 object_type,
                 if_exists,
                 mut names,
-                cascade: _,
+                cascade,
                 restrict: _,
                 purge: _,
             } => {
@@ -272,8 +272,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             schema: DFSchemaRef::new(DFSchema::empty()),
                         })))
                     }
+                    ObjectType::Schema => {
+                        let name = match name {
+                            TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table } ) ,
+                            TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table,catalog: schema }),
+                            TableReference::Full { catalog: _, schema: _, table: _ } => {
+                                Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
+                            },
+                        }?;
+                        Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
+                            name,
+                            if_exists,
+                            cascade,
+                            schema: DFSchemaRef::new(DFSchema::empty()),
+                        })))},
                     _ => Err(DataFusionError::NotImplemented(
-                        "Only `DROP TABLE/VIEW  ...` statement is supported currently"
+                        "Only `DROP TABLE/VIEW/SCHEMA  ...` statement is supported currently"
                             .to_string(),
                     )),
                 }