You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ho...@apache.org on 2021/11/06 23:33:56 UTC

[arrow-datafusion] branch master updated: Add support for `create table as` via MemTable (#1243)

This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ac07269  Add support for `create table as` via MemTable (#1243)
ac07269 is described below

commit ac072692387b7dcd84d0af921e959a7e57380bf7
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sun Nov 7 00:33:52 2021 +0100

    Add support for `create table as` via MemTable (#1243)
    
    * Add support for create table as via MemTable
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  3 ++
 datafusion/src/execution/context.rs                | 30 ++++++++++++++----
 datafusion/src/logical_plan/plan.rs                | 15 +++++++++
 .../src/optimizer/common_subexpr_eliminate.rs      |  1 +
 datafusion/src/optimizer/constant_folding.rs       |  1 +
 datafusion/src/optimizer/projection_push_down.rs   |  1 +
 datafusion/src/optimizer/utils.rs                  |  6 ++++
 datafusion/src/physical_plan/planner.rs            |  9 +++++-
 datafusion/src/sql/planner.rs                      | 36 ++++++++++++++++++++--
 datafusion/tests/sql.rs                            | 24 +++++++++++++++
 10 files changed, 116 insertions(+), 10 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 80af698..ca83ffd 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1008,6 +1008,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
+            LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
+                "Error converting CreateMemoryTable. Not yet supported in Ballista",
+            )),
         }
     }
 }
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 81fd785..6295184 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -21,13 +21,16 @@ use crate::{
         catalog::{CatalogList, MemoryCatalogList},
         information_schema::CatalogWithInformationSchema,
     },
-    datasource::file_format::{
-        avro::AvroFormat,
-        csv::CsvFormat,
-        parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
-        FileFormat,
-    },
     datasource::listing::{ListingOptions, ListingTable},
+    datasource::{
+        file_format::{
+            avro::AvroFormat,
+            csv::CsvFormat,
+            parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+            FileFormat,
+        },
+        MemTable,
+    },
     logical_plan::{PlanType, ToStringifiedPlan},
     optimizer::eliminate_limit::EliminateLimit,
     physical_optimizer::{
@@ -237,6 +240,21 @@ impl ExecutionContext {
                 Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
             }
 
+            LogicalPlan::CreateMemoryTable { input, name } => {
+                let plan = self.optimize(&input)?;
+                let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));
+
+                let batches: Vec<_> = physical.collect_partitioned().await?;
+                let table = Arc::new(MemTable::try_new(
+                    Arc::new(plan.schema().as_ref().into()),
+                    batches,
+                )?);
+                self.register_table(name.as_str(), table)?;
+
+                let plan = LogicalPlanBuilder::empty(false).build()?;
+                Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+            }
+
             plan => Ok(Arc::new(DataFrameImpl::new(
                 self.state.clone(),
                 &self.optimize(&plan)?,
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index f53d8e6..6faac01 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -205,6 +205,13 @@ pub enum LogicalPlan {
         /// Whether the CSV file contains a header
         has_header: bool,
     },
+    /// Creates an in memory table.
+    CreateMemoryTable {
+        /// The table name
+        name: String,
+        /// The logical plan
+        input: Arc<LogicalPlan>,
+    },
     /// Values expression. See
     /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
     /// documentation for more details.
@@ -266,6 +273,7 @@ impl LogicalPlan {
             LogicalPlan::Analyze { schema, .. } => schema,
             LogicalPlan::Extension { node } => node.schema(),
             LogicalPlan::Union { schema, .. } => schema,
+            LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
         }
     }
 
@@ -310,6 +318,7 @@ impl LogicalPlan {
             LogicalPlan::Limit { input, .. }
             | LogicalPlan::Repartition { input, .. }
             | LogicalPlan::Sort { input, .. }
+            | LogicalPlan::CreateMemoryTable { input, .. }
             | LogicalPlan::Filter { input, .. } => input.all_schemas(),
         }
     }
@@ -356,6 +365,7 @@ impl LogicalPlan {
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Limit { .. }
             | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::CreateMemoryTable { .. }
             | LogicalPlan::CrossJoin { .. }
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
@@ -382,6 +392,7 @@ impl LogicalPlan {
             LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
             LogicalPlan::Explain { plan, .. } => vec![plan],
             LogicalPlan::Analyze { input: plan, .. } => vec![plan],
+            LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
@@ -519,6 +530,7 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
+            LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
             LogicalPlan::Extension { node } => {
                 for input in node.inputs() {
                     if !input.accept(visitor)? {
@@ -848,6 +860,9 @@ impl LogicalPlan {
                     LogicalPlan::CreateExternalTable { ref name, .. } => {
                         write!(f, "CreateExternalTable: {:?}", name)
                     }
+                    LogicalPlan::CreateMemoryTable { ref name, .. } => {
+                        write!(f, "CreateMemoryTable: {:?}", name)
+                    }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
                     LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
                     LogicalPlan::Union { .. } => write!(f, "Union"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 0e97663..9f2f2af 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -205,6 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::CreateExternalTable { .. }
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
+        | LogicalPlan::CreateMemoryTable { .. }
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index 7249160..5299b9a 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -70,6 +70,7 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Aggregate { .. }
             | LogicalPlan::Repartition { .. }
             | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::CreateMemoryTable { .. }
             | LogicalPlan::Values { .. }
             | LogicalPlan::Extension { .. }
             | LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 78ea278..a30523d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -436,6 +436,7 @@ fn optimize_plan(
         | LogicalPlan::Values { .. }
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::CreateMemoryTable { .. }
         | LogicalPlan::CrossJoin { .. }
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 8ce0d3a..c94d48c 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -222,6 +222,12 @@ pub fn from_plan(
             n: *n,
             input: Arc::new(inputs[0].clone()),
         }),
+        LogicalPlan::CreateMemoryTable { name, .. } => {
+            Ok(LogicalPlan::CreateMemoryTable {
+                input: Arc::new(inputs[0].clone()),
+                name: name.clone(),
+            })
+        }
         LogicalPlan::Extension { node } => Ok(LogicalPlan::Extension {
             node: node.from_template(expr, inputs),
         }),
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 12e7401..e1170ed 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, limit)))
                 }
-                LogicalPlan::CreateExternalTable { .. } => {
+                LogicalPlan::CreateExternalTable { .. }=> {
                     // There is no default plan for "CREATE EXTERNAL
                     // TABLE" -- it must be handled at a higher level (so
                     // that the appropriate table can be registered with
@@ -811,6 +811,13 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateExternalTable".to_string(),
                     ))
                 }
+                | LogicalPlan::CreateMemoryTable {..} => {
+                    // Create a dummy exec.
+                    Ok(Arc::new(EmptyExec::new(
+                        false,
+                        SchemaRef::new(Schema::empty()),
+                    )))
+                }
                 LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
                     "Unsupported logical plan: Explain must be root of the plan".to_string(),
                 )),
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index de09632..50875a8 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -48,9 +48,9 @@ use arrow::datatypes::*;
 use hashbrown::HashMap;
 use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
-    Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem,
-    SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
-    TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+    HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
+    Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
+    TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
 };
 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
 use sqlparser::ast::{OrderByExpr, Statement};
@@ -133,6 +133,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             } => self.explain_statement_to_plan(*verbose, *analyze, statement),
             Statement::Query(query) => self.query_to_plan(query),
             Statement::ShowVariable { variable } => self.show_variable_to_plan(variable),
+            Statement::CreateTable {
+                query: Some(query),
+                name,
+                or_replace: false,
+                columns,
+                constraints,
+                hive_distribution: HiveDistributionStyle::NONE,
+                hive_formats: _hive_formats,
+                table_properties,
+                with_options,
+                file_format: None,
+                location: None,
+                like: None,
+                temporary: _temporary,
+                external: false,
+                if_not_exists: false,
+                without_rowid: _without_row_id,
+            } if columns.is_empty()
+                && constraints.is_empty()
+                && table_properties.is_empty()
+                && with_options.is_empty() =>
+            {
+                let plan = self.query_to_plan(query)?;
+
+                Ok(LogicalPlan::CreateMemoryTable {
+                    name: name.to_string(),
+                    input: Arc::new(plan),
+                })
+            }
+
             Statement::ShowColumns {
                 extended,
                 full,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 996908a..e6064af 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -688,6 +688,30 @@ async fn select_all() -> Result<()> {
 }
 
 #[tokio::test]
+async fn create_table_as() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_simple_csv(&mut ctx).await?;
+
+    let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
+    ctx.sql(sql).await.unwrap();
+
+    let sql_all = "SELECT * FROM my_table order by c1 LIMIT 1";
+    let results_all = execute_to_batches(&mut ctx, sql_all).await;
+
+    let expected = vec![
+        "+---------+----------------+------+",
+        "| c1      | c2             | c3   |",
+        "+---------+----------------+------+",
+        "| 0.00001 | 0.000000000001 | true |",
+        "+---------+----------------+------+",
+    ];
+
+    assert_batches_eq!(expected, &results_all);
+
+    Ok(())
+}
+
+#[tokio::test]
 async fn select_distinct() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_simple_csv(&mut ctx).await?;