You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/11/10 07:04:54 UTC

[arrow-datafusion] branch master updated: Add drop table support (#1266)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7060961  Add drop table support (#1266)
7060961 is described below

commit 7060961fd944efa48c7ed93677041f4d089b5da5
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Nov 9 23:04:49 2021 -0800

    Add drop table support (#1266)
    
    * Add drop table support.
    
    * Rename to DropTable.
    
    * Remove undeclared crate.
    
    * Fix clippy error.
    
    * For review comments.
---
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  3 +++
 datafusion/src/execution/context.rs                | 13 ++++++++++++
 datafusion/src/logical_plan/plan.rs                | 23 ++++++++++++++++++++--
 .../src/optimizer/common_subexpr_eliminate.rs      |  1 +
 datafusion/src/optimizer/constant_folding.rs       |  1 +
 datafusion/src/optimizer/projection_push_down.rs   |  1 +
 datafusion/src/optimizer/utils.rs                  |  3 ++-
 datafusion/src/physical_plan/planner.rs            |  4 ++--
 datafusion/src/sql/planner.rs                      | 22 ++++++++++++++++++---
 datafusion/tests/sql.rs                            | 20 +++++++++++++++++++
 10 files changed, 83 insertions(+), 8 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index ca83ffd..e4c7656 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1011,6 +1011,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
             LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
                 "Error converting CreateMemoryTable. Not yet supported in Ballista",
             )),
+            LogicalPlan::DropTable { .. } => Err(proto_error(
+                "Error converting DropTable. Not yet supported in Ballista",
+            )),
         }
     }
 }
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 99358d6..78a7884 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -255,6 +255,19 @@ impl ExecutionContext {
                 Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
             }
 
+            LogicalPlan::DropTable { name, if_exist, .. } => {
+                let returned = self.deregister_table(name.as_str())?;
+                if !if_exist && returned.is_none() {
+                    Err(DataFusionError::Execution(format!(
+                        "Memory table {:?} doesn't exist.",
+                        name
+                    )))
+                } else {
+                    let plan = LogicalPlanBuilder::empty(false).build()?;
+                    Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+                }
+            }
+
             plan => Ok(Arc::new(DataFrameImpl::new(
                 self.state.clone(),
                 &self.optimize(&plan)?,
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 6faac01..d1e1678 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -212,6 +212,15 @@ pub enum LogicalPlan {
         /// The logical plan
         input: Arc<LogicalPlan>,
     },
+    /// Drops a table.
+    DropTable {
+        /// The table name
+        name: String,
+        /// If the table exists
+        if_exist: bool,
+        /// Dummy schema
+        schema: DFSchemaRef,
+    },
     /// Values expression. See
     /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
     /// documentation for more details.
@@ -274,6 +283,7 @@ impl LogicalPlan {
             LogicalPlan::Extension { node } => node.schema(),
             LogicalPlan::Union { schema, .. } => schema,
             LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
+            LogicalPlan::DropTable { schema, .. } => schema,
         }
     }
 
@@ -320,6 +330,7 @@ impl LogicalPlan {
             | LogicalPlan::Sort { input, .. }
             | LogicalPlan::CreateMemoryTable { input, .. }
             | LogicalPlan::Filter { input, .. } => input.all_schemas(),
+            LogicalPlan::DropTable { .. } => vec![],
         }
     }
 
@@ -366,6 +377,7 @@ impl LogicalPlan {
             | LogicalPlan::Limit { .. }
             | LogicalPlan::CreateExternalTable { .. }
             | LogicalPlan::CreateMemoryTable { .. }
+            | LogicalPlan::DropTable { .. }
             | LogicalPlan::CrossJoin { .. }
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
@@ -397,7 +409,8 @@ impl LogicalPlan {
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
-            | LogicalPlan::CreateExternalTable { .. } => vec![],
+            | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::DropTable { .. } => vec![],
         }
     }
 
@@ -545,7 +558,8 @@ impl LogicalPlan {
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
-            | LogicalPlan::CreateExternalTable { .. } => true,
+            | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::DropTable { .. } => true,
         };
         if !recurse {
             return Ok(false);
@@ -863,6 +877,11 @@ impl LogicalPlan {
                     LogicalPlan::CreateMemoryTable { ref name, .. } => {
                         write!(f, "CreateMemoryTable: {:?}", name)
                     }
+                    LogicalPlan::DropTable {
+                        ref name, if_exist, ..
+                    } => {
+                        write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
+                    }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
                     LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
                     LogicalPlan::Union { .. } => write!(f, "Union"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 9f2f2af..c88032e 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -206,6 +206,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
         | LogicalPlan::CreateMemoryTable { .. }
+        | LogicalPlan::DropTable { .. }
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index 5299b9a..a0bc04a 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -71,6 +71,7 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Repartition { .. }
             | LogicalPlan::CreateExternalTable { .. }
             | LogicalPlan::CreateMemoryTable { .. }
+            | LogicalPlan::DropTable { .. }
             | LogicalPlan::Values { .. }
             | LogicalPlan::Extension { .. }
             | LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index a30523d..7c087a1 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -437,6 +437,7 @@ fn optimize_plan(
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable { .. }
         | LogicalPlan::CreateMemoryTable { .. }
+        | LogicalPlan::DropTable { .. }
         | LogicalPlan::CrossJoin { .. }
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index c94d48c..52beb69 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -263,7 +263,8 @@ pub fn from_plan(
         }
         LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::TableScan { .. }
-        | LogicalPlan::CreateExternalTable { .. } => {
+        | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::DropTable { .. } => {
             // All of these plan types have no inputs / exprs so should not be called
             assert!(expr.is_empty(), "{:?} should have no exprs", plan);
             assert!(inputs.is_empty(), "{:?}  should have no inputs", plan);
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index e1170ed..402f119 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, limit)))
                 }
-                LogicalPlan::CreateExternalTable { .. }=> {
+                LogicalPlan::CreateExternalTable { .. } => {
                     // There is no default plan for "CREATE EXTERNAL
                     // TABLE" -- it must be handled at a higher level (so
                     // that the appropriate table can be registered with
@@ -811,7 +811,7 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateExternalTable".to_string(),
                     ))
                 }
-                | LogicalPlan::CreateMemoryTable {..} => {
+                | LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => {
                     // Create a dummy exec.
                     Ok(Arc::new(EmptyExec::new(
                         false,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 2053d76..3e1e76d 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -29,8 +29,8 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
     and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
-    DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
-    ToStringifiedPlan,
+    DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
+    ToDFSchema, ToStringifiedPlan,
 };
 use crate::optimizer::utils::exprlist_to_columns;
 use crate::prelude::JoinType;
@@ -53,7 +53,7 @@ use sqlparser::ast::{
     TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
 };
 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
-use sqlparser::ast::{OrderByExpr, Statement};
+use sqlparser::ast::{ObjectType, OrderByExpr, Statement};
 use sqlparser::parser::ParserError::ParserError;
 
 use super::{
@@ -163,6 +163,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 })
             }
 
+            Statement::Drop {
+                object_type: ObjectType::Table,
+                if_exists,
+                names,
+                cascade: _,
+                purge: _,
+            } =>
+            // We don't support cascade and purge for now.
+            {
+                Ok(LogicalPlan::DropTable {
+                    name: names.get(0).unwrap().to_string(),
+                    if_exist: *if_exists,
+                    schema: DFSchemaRef::new(DFSchema::empty()),
+                })
+            }
+
             Statement::ShowColumns {
                 extended,
                 full,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 2924482..7bba098 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -712,6 +712,26 @@ async fn create_table_as() -> Result<()> {
 }
 
 #[tokio::test]
+async fn drop_table() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_simple_csv(&mut ctx).await?;
+
+    let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
+    ctx.sql(sql).await.unwrap();
+
+    let sql = "DROP TABLE my_table";
+    ctx.sql(sql).await.unwrap();
+
+    let result = ctx.table("my_table");
+    assert!(result.is_err(), "drop table should deregister table.");
+
+    let sql = "DROP TABLE IF EXISTS my_table";
+    ctx.sql(sql).await.unwrap();
+
+    Ok(())
+}
+
+#[tokio::test]
 async fn select_distinct() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_simple_csv(&mut ctx).await?;