You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/11/10 07:04:54 UTC
[arrow-datafusion] branch master updated: Add drop table support
(#1266)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7060961 Add drop table support (#1266)
7060961 is described below
commit 7060961fd944efa48c7ed93677041f4d089b5da5
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Nov 9 23:04:49 2021 -0800
Add drop table support (#1266)
* Add drop table support.
* Rename to DropTable.
* Remove undeclared crate.
* Fix clippy error.
* For review comments.
---
.../rust/core/src/serde/logical_plan/to_proto.rs | 3 +++
datafusion/src/execution/context.rs | 13 ++++++++++++
datafusion/src/logical_plan/plan.rs | 23 ++++++++++++++++++++--
.../src/optimizer/common_subexpr_eliminate.rs | 1 +
datafusion/src/optimizer/constant_folding.rs | 1 +
datafusion/src/optimizer/projection_push_down.rs | 1 +
datafusion/src/optimizer/utils.rs | 3 ++-
datafusion/src/physical_plan/planner.rs | 4 ++--
datafusion/src/sql/planner.rs | 22 ++++++++++++++++++---
datafusion/tests/sql.rs | 20 +++++++++++++++++++
10 files changed, 83 insertions(+), 8 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index ca83ffd..e4c7656 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1011,6 +1011,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
)),
+ LogicalPlan::DropTable { .. } => Err(proto_error(
+ "Error converting DropTable. Not yet supported in Ballista",
+ )),
}
}
}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 99358d6..78a7884 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -255,6 +255,19 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
+ LogicalPlan::DropTable { name, if_exist, .. } => {
+ let returned = self.deregister_table(name.as_str())?;
+ if !if_exist && returned.is_none() {
+ Err(DataFusionError::Execution(format!(
+ "Memory table {:?} doesn't exist.",
+ name
+ )))
+ } else {
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+ }
+ }
+
plan => Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&self.optimize(&plan)?,
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 6faac01..d1e1678 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -212,6 +212,15 @@ pub enum LogicalPlan {
/// The logical plan
input: Arc<LogicalPlan>,
},
+ /// Drops a table.
+ DropTable {
+ /// The table name
+ name: String,
+ /// If the table exists
+ if_exist: bool,
+ /// Dummy schema
+ schema: DFSchemaRef,
+ },
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
@@ -274,6 +283,7 @@ impl LogicalPlan {
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
+ LogicalPlan::DropTable { schema, .. } => schema,
}
}
@@ -320,6 +330,7 @@ impl LogicalPlan {
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable { input, .. }
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
+ LogicalPlan::DropTable { .. } => vec![],
}
}
@@ -366,6 +377,7 @@ impl LogicalPlan {
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
+ | LogicalPlan::DropTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
@@ -397,7 +409,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable { .. } => vec![],
+ | LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::DropTable { .. } => vec![],
}
}
@@ -545,7 +558,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable { .. } => true,
+ | LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::DropTable { .. } => true,
};
if !recurse {
return Ok(false);
@@ -863,6 +877,11 @@ impl LogicalPlan {
LogicalPlan::CreateMemoryTable { ref name, .. } => {
write!(f, "CreateMemoryTable: {:?}", name)
}
+ LogicalPlan::DropTable {
+ ref name, if_exist, ..
+ } => {
+ write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
+ }
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 9f2f2af..c88032e 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -206,6 +206,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable { .. }
+ | LogicalPlan::DropTable { .. }
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index 5299b9a..a0bc04a 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -71,6 +71,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
+ | LogicalPlan::DropTable { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index a30523d..7c087a1 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -437,6 +437,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
+ | LogicalPlan::DropTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index c94d48c..52beb69 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -263,7 +263,8 @@ pub fn from_plan(
}
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
- | LogicalPlan::CreateExternalTable { .. } => {
+ | LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::DropTable { .. } => {
// All of these plan types have no inputs / exprs so should not be called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index e1170ed..402f119 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
- LogicalPlan::CreateExternalTable { .. }=> {
+ LogicalPlan::CreateExternalTable { .. } => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
@@ -811,7 +811,7 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
- | LogicalPlan::CreateMemoryTable {..} => {
+ | LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
false,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 2053d76..3e1e76d 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -29,8 +29,8 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
- DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
- ToStringifiedPlan,
+ DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
+ ToDFSchema, ToStringifiedPlan,
};
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
@@ -53,7 +53,7 @@ use sqlparser::ast::{
TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
-use sqlparser::ast::{OrderByExpr, Statement};
+use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
use sqlparser::parser::ParserError::ParserError;
use super::{
@@ -163,6 +163,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})
}
+ Statement::Drop {
+ object_type: ObjectType::Table,
+ if_exists,
+ names,
+ cascade: _,
+ purge: _,
+ } =>
+ // We don't support cascade and purge for now.
+ {
+ Ok(LogicalPlan::DropTable {
+ name: names.get(0).unwrap().to_string(),
+ if_exist: *if_exists,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ })
+ }
+
Statement::ShowColumns {
extended,
full,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 2924482..7bba098 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -712,6 +712,26 @@ async fn create_table_as() -> Result<()> {
}
#[tokio::test]
+async fn drop_table() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_simple_csv(&mut ctx).await?;
+
+ let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
+ ctx.sql(sql).await.unwrap();
+
+ let sql = "DROP TABLE my_table";
+ ctx.sql(sql).await.unwrap();
+
+ let result = ctx.table("my_table");
+ assert!(result.is_err(), "drop table should deregister table.");
+
+ let sql = "DROP TABLE IF EXISTS my_table";
+ ctx.sql(sql).await.unwrap();
+
+ Ok(())
+}
+
+#[tokio::test]
async fn select_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;