You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/30 16:03:26 UTC
[arrow-datafusion] branch main updated: drop schema refactored (#6096)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a015798a3a drop schema refactored (#6096)
a015798a3a is described below
commit a015798a3aa7fc8543e13022f2a17be23b8afba4
Author: Jay Miller <37...@users.noreply.github.com>
AuthorDate: Sun Apr 30 12:03:19 2023 -0400
drop schema refactored (#6096)
---
datafusion/common/src/lib.rs | 2 +
datafusion/common/src/schema_reference.rs | 64 +++++++++++++++++
datafusion/core/src/catalog/catalog.rs | 83 +++++++++++++++++++++-
datafusion/core/src/execution/context.rs | 47 +++++++++++-
.../core/tests/sqllogictests/test_files/ddl.slt | 22 ++++++
.../test_files/information_schema.slt | 2 +-
datafusion/expr/src/logical_plan/ddl.rs | 28 +++++++-
datafusion/expr/src/logical_plan/mod.rs | 2 +-
datafusion/proto/src/logical_plan/mod.rs | 3 +
datafusion/sql/src/statement.rs | 24 +++++--
10 files changed, 264 insertions(+), 13 deletions(-)
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 1b59f43a67..7bbb3fbb71 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -26,6 +26,7 @@ pub mod parsers;
#[cfg(feature = "pyarrow")]
mod pyarrow;
pub mod scalar;
+mod schema_reference;
pub mod stats;
mod table_reference;
pub mod test_util;
@@ -39,6 +40,7 @@ pub use error::{
SharedResult,
};
pub use scalar::{ScalarType, ScalarValue};
+pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
diff --git a/datafusion/common/src/schema_reference.rs b/datafusion/common/src/schema_reference.rs
new file mode 100644
index 0000000000..a7dd49e5c3
--- /dev/null
+++ b/datafusion/common/src/schema_reference.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub enum SchemaReference<'a> {
+ Bare {
+ schema: Cow<'a, str>,
+ },
+ Full {
+ schema: Cow<'a, str>,
+ catalog: Cow<'a, str>,
+ },
+}
+
+impl SchemaReference<'_> {
+ /// Get only the schema name that this references.
+ pub fn schema_name(&self) -> &str {
+ match self {
+ SchemaReference::Bare { schema } => schema,
+ SchemaReference::Full { schema, catalog: _ } => schema,
+ }
+ }
+}
+
+pub type OwnedSchemaReference = SchemaReference<'static>;
+
+impl std::fmt::Display for SchemaReference<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Bare { schema } => write!(f, "{schema}"),
+ Self::Full { schema, catalog } => write!(f, "{catalog}.{schema}"),
+ }
+ }
+}
+
+impl<'a> From<&'a OwnedSchemaReference> for SchemaReference<'a> {
+ fn from(value: &'a OwnedSchemaReference) -> Self {
+ match value {
+ SchemaReference::Bare { schema } => SchemaReference::Bare {
+ schema: Cow::Borrowed(schema),
+ },
+ SchemaReference::Full { schema, catalog } => SchemaReference::Full {
+ schema: Cow::Borrowed(schema),
+ catalog: Cow::Borrowed(catalog),
+ },
+ }
+ }
+}
diff --git a/datafusion/core/src/catalog/catalog.rs b/datafusion/core/src/catalog/catalog.rs
index ccec8bd25e..393d98dcb8 100644
--- a/datafusion/core/src/catalog/catalog.rs
+++ b/datafusion/core/src/catalog/catalog.rs
@@ -124,6 +124,26 @@ pub trait CatalogProvider: Sync + Send {
"Registering new schemas is not supported".to_string(),
))
}
+
+ /// Removes a schema from this catalog. Implementations of this method should return
+ /// errors if the schema exists but cannot be dropped. For example, in DataFusion's
+ /// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
+ /// will only be successfully dropped when `cascade` is true.
+ /// This is equivalent to how DROP SCHEMA works in PostgreSQL.
+ ///
+ /// Implementations of this method should return None if schema with `name`
+ /// does not exist.
+ ///
+ /// By default returns a "Not Implemented" error
+ fn deregister_schema(
+ &self,
+ _name: &str,
+ _cascade: bool,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ Err(DataFusionError::NotImplemented(
+ "Deregistering new schemas is not supported".to_string(),
+ ))
+ }
}
/// Simple in-memory implementation of a catalog.
@@ -160,13 +180,38 @@ impl CatalogProvider for MemoryCatalogProvider {
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.into(), schema))
}
+
+ fn deregister_schema(
+ &self,
+ name: &str,
+ cascade: bool,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ if let Some(schema) = self.schema(name) {
+ let table_names = schema.table_names();
+ match (table_names.is_empty(), cascade) {
+ (true, _) | (false, true) => {
+ let (_, removed) = self.schemas.remove(name).unwrap();
+ Ok(Some(removed))
+ }
+ (false, false) => Err(DataFusionError::Execution(format!(
+ "Cannot drop schema {} because other tables depend on it: {}",
+ name,
+ itertools::join(table_names.iter(), ", ")
+ ))),
+ }
+ } else {
+ Ok(None)
+ }
+ }
}
#[cfg(test)]
mod tests {
- use crate::catalog::schema::MemorySchemaProvider;
-
use super::*;
+ use crate::catalog::schema::MemorySchemaProvider;
+ use crate::datasource::empty::EmptyTable;
+ use crate::datasource::TableProvider;
+ use arrow::datatypes::Schema;
#[test]
fn default_register_schema_not_supported() {
@@ -194,4 +239,38 @@ mod tests {
Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
};
}
+
+ #[test]
+ fn memory_catalog_dereg_nonempty_schema() {
+ let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+
+ let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
+ let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
+ as Arc<dyn TableProvider>;
+ schema.register_table("t".into(), test_table).unwrap();
+
+ cat.register_schema("foo", schema.clone()).unwrap();
+
+ assert!(
+ cat.deregister_schema("foo", false).is_err(),
+ "dropping empty schema without cascade should error"
+ );
+ assert!(cat.deregister_schema("foo", true).unwrap().is_some());
+ }
+
+ #[test]
+ fn memory_catalog_dereg_empty_schema() {
+ let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+
+ let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
+ cat.register_schema("foo", schema.clone()).unwrap();
+
+ assert!(cat.deregister_schema("foo", false).unwrap().is_some());
+ }
+
+ #[test]
+ fn memory_catalog_dereg_missing() {
+ let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
+ assert!(cat.deregister_schema("foo", false).unwrap().is_none());
+ }
}
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ee8cfe5dc5..d76e26e1b7 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -65,8 +65,8 @@ use crate::datasource::{
use crate::error::{DataFusionError, Result};
use crate::logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
- CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
- SetVariable, TableSource, TableType, UNNAMED_TABLE,
+ CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan,
+ LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
use crate::optimizer::OptimizerRule;
use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference};
@@ -86,7 +86,7 @@ use crate::physical_plan::PhysicalPlanner;
use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
-use datafusion_common::OwnedTableReference;
+use datafusion_common::{OwnedTableReference, SchemaReference};
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
@@ -383,6 +383,7 @@ impl SessionContext {
DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await,
DdlStatement::DropTable(cmd) => self.drop_table(cmd).await,
DdlStatement::DropView(cmd) => self.drop_view(cmd).await,
+ DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await,
},
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
@@ -653,6 +654,46 @@ impl SessionContext {
}
}
+ async fn drop_schema(&self, cmd: DropCatalogSchema) -> Result<DataFrame> {
+ let DropCatalogSchema {
+ name,
+ if_exists: allow_missing,
+ cascade,
+ schema: _,
+ } = cmd;
+ let catalog = {
+ let state = self.state.read();
+ let catalog_name = match &name {
+ SchemaReference::Full { catalog, .. } => catalog.to_string(),
+ SchemaReference::Bare { .. } => {
+ state.config_options().catalog.default_catalog.to_string()
+ }
+ };
+ if let Some(catalog) = state.catalog_list.catalog(&catalog_name) {
+ catalog
+ } else if allow_missing {
+ return self.return_empty_dataframe();
+ } else {
+ return self.schema_doesnt_exist_err(name);
+ }
+ };
+ let dereg = catalog.deregister_schema(name.schema_name(), cascade)?;
+ match (dereg, allow_missing) {
+ (None, true) => self.return_empty_dataframe(),
+ (None, false) => self.schema_doesnt_exist_err(name),
+ (Some(_), _) => self.return_empty_dataframe(),
+ }
+ }
+
+ fn schema_doesnt_exist_err(
+ &self,
+ schemaref: SchemaReference<'_>,
+ ) -> Result<DataFrame> {
+ Err(DataFusionError::Execution(format!(
+ "Schema '{schemaref}' doesn't exist."
+ )))
+ }
+
async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
let SetVariable {
variable, value, ..
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 49ec168d70..856994881f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -643,3 +643,25 @@ select * from t;
statement ok
drop table t;
+
+##########
+# Dropping schemas
+##########
+
+statement error DataFusion error: Execution error: Cannot drop schema foo_schema because other tables depend on it: bar
+DROP SCHEMA foo_schema;
+
+statement ok
+DROP SCHEMA foo_schema CASCADE;
+
+statement error DataFusion error: Execution error: Schema 'doesnt_exist' doesn't exist.
+DROP SCHEMA doesnt_exist;
+
+statement ok
+DROP SCHEMA IF EXISTS doesnt_exist;
+
+statement ok
+CREATE SCHEMA empty_schema;
+
+statement ok
+DROP SCHEMA empty_schema;
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 80187564f9..3ea42583c8 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -375,7 +375,7 @@ SHOW CREATE TABLE test.xyz
----
datafusion test xyz CREATE VIEW test.xyz AS SELECT * FROM abc
-statement error DataFusion error: This feature is not implemented: Only `DROP TABLE/VIEW
+statement error DataFusion error: Execution error: Cannot drop schema test because other tables depend on it: xyz
DROP SCHEMA test;
statement ok
diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs
index 9f87417281..eeaa1c5ddb 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_common::Column;
use datafusion_common::{
parsers::CompressionTypeVariant, DFSchemaRef, OwnedTableReference,
};
+use datafusion_common::{Column, OwnedSchemaReference};
use std::collections::HashMap;
use std::sync::Arc;
use std::{
@@ -45,6 +45,8 @@ pub enum DdlStatement {
DropTable(DropTable),
/// Drops a view.
DropView(DropView),
+ /// Drops a catalog schema
+ DropCatalogSchema(DropCatalogSchema),
}
impl DdlStatement {
@@ -62,6 +64,7 @@ impl DdlStatement {
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
+ DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
}
}
@@ -76,6 +79,7 @@ impl DdlStatement {
DdlStatement::CreateCatalog(_) => "CreateCatalog",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
+ DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
}
}
@@ -91,6 +95,7 @@ impl DdlStatement {
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
+ DdlStatement::DropCatalogSchema(_) => vec![],
}
}
@@ -147,6 +152,14 @@ impl DdlStatement {
}) => {
write!(f, "DropView: {name:?} if not exist:={if_exists}")
}
+ DdlStatement::DropCatalogSchema(DropCatalogSchema {
+ name,
+ if_exists,
+ cascade,
+ ..
+ }) => {
+ write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
+ }
}
}
}
@@ -273,3 +286,16 @@ pub struct DropView {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+/// Drops a schema
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DropCatalogSchema {
+ /// The schema name
+ pub name: OwnedSchemaReference,
+ /// If the schema exists
+ pub if_exists: bool,
+ /// Whether drop should cascade
+ pub cascade: bool,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 6a0249320e..01862c3d54 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -29,7 +29,7 @@ pub use builder::{
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
- CreateView, DdlStatement, DropTable, DropView,
+ CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView,
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 709c391772..8789b7302e 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1372,6 +1372,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Ddl(DdlStatement::DropView(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropView",
)),
+ LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
+ "LogicalPlan serde is not yet implemented for DropCatalogSchema",
+ )),
LogicalPlan::Statement(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Statement",
)),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index b48261dcc4..779a79e155 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -26,7 +26,7 @@ use arrow_schema::DataType;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
- OwnedTableReference, Result, TableReference, ToDFSchema,
+ OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema,
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
@@ -35,8 +35,8 @@ use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{
cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
- DescribeTable, DmlStatement, DropTable, DropView, EmptyRelation, Explain,
- ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
+ DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, EmptyRelation,
+ Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
WriteOp,
@@ -242,7 +242,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
object_type,
if_exists,
mut names,
- cascade: _,
+ cascade,
restrict: _,
purge: _,
} => {
@@ -272,8 +272,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: DFSchemaRef::new(DFSchema::empty()),
})))
}
+ ObjectType::Schema => {
+ let name = match name {
+ TableReference::Bare { table } => Ok(SchemaReference::Bare { schema: table } ) ,
+ TableReference::Partial { schema, table } => Ok(SchemaReference::Full { schema: table,catalog: schema }),
+ TableReference::Full { catalog: _, schema: _, table: _ } => {
+ Err(ParserError("Invalid schema specifier (has 3 parts)".to_string()))
+ },
+ }?;
+ Ok(LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(DropCatalogSchema {
+ name,
+ if_exists,
+ cascade,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ })))},
_ => Err(DataFusionError::NotImplemented(
- "Only `DROP TABLE/VIEW ...` statement is supported currently"
+ "Only `DROP TABLE/VIEW/SCHEMA ...` statement is supported currently"
.to_string(),
)),
}