You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ho...@apache.org on 2021/11/06 23:33:56 UTC
[arrow-datafusion] branch master updated: Add support for `create
table as` via MemTable (#1243)
This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ac07269 Add support for `create table as` via MemTable (#1243)
ac07269 is described below
commit ac072692387b7dcd84d0af921e959a7e57380bf7
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sun Nov 7 00:33:52 2021 +0100
Add support for `create table as` via MemTable (#1243)
* Add support for create table as via MemTable
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../rust/core/src/serde/logical_plan/to_proto.rs | 3 ++
datafusion/src/execution/context.rs | 30 ++++++++++++++----
datafusion/src/logical_plan/plan.rs | 15 +++++++++
.../src/optimizer/common_subexpr_eliminate.rs | 1 +
datafusion/src/optimizer/constant_folding.rs | 1 +
datafusion/src/optimizer/projection_push_down.rs | 1 +
datafusion/src/optimizer/utils.rs | 6 ++++
datafusion/src/physical_plan/planner.rs | 9 +++++-
datafusion/src/sql/planner.rs | 36 ++++++++++++++++++++--
datafusion/tests/sql.rs | 24 +++++++++++++++
10 files changed, 116 insertions(+), 10 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 80af698..ca83ffd 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1008,6 +1008,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
+ LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
+ "Error converting CreateMemoryTable. Not yet supported in Ballista",
+ )),
}
}
}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 81fd785..6295184 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -21,13 +21,16 @@ use crate::{
catalog::{CatalogList, MemoryCatalogList},
information_schema::CatalogWithInformationSchema,
},
- datasource::file_format::{
- avro::AvroFormat,
- csv::CsvFormat,
- parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
- FileFormat,
- },
datasource::listing::{ListingOptions, ListingTable},
+ datasource::{
+ file_format::{
+ avro::AvroFormat,
+ csv::CsvFormat,
+ parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
+ FileFormat,
+ },
+ MemTable,
+ },
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_limit::EliminateLimit,
physical_optimizer::{
@@ -237,6 +240,21 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
+ LogicalPlan::CreateMemoryTable { input, name } => {
+ let plan = self.optimize(&input)?;
+ let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));
+
+ let batches: Vec<_> = physical.collect_partitioned().await?;
+ let table = Arc::new(MemTable::try_new(
+ Arc::new(plan.schema().as_ref().into()),
+ batches,
+ )?);
+ self.register_table(name.as_str(), table)?;
+
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
+ }
+
plan => Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&self.optimize(&plan)?,
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index f53d8e6..6faac01 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -205,6 +205,13 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
+ /// Creates an in memory table.
+ CreateMemoryTable {
+ /// The table name
+ name: String,
+ /// The logical plan
+ input: Arc<LogicalPlan>,
+ },
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
@@ -266,6 +273,7 @@ impl LogicalPlan {
LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
+ LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
}
}
@@ -310,6 +318,7 @@ impl LogicalPlan {
LogicalPlan::Limit { input, .. }
| LogicalPlan::Repartition { input, .. }
| LogicalPlan::Sort { input, .. }
+ | LogicalPlan::CreateMemoryTable { input, .. }
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
}
}
@@ -356,6 +365,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
@@ -382,6 +392,7 @@ impl LogicalPlan {
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
+ LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
@@ -519,6 +530,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
+ LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
LogicalPlan::Extension { node } => {
for input in node.inputs() {
if !input.accept(visitor)? {
@@ -848,6 +860,9 @@ impl LogicalPlan {
LogicalPlan::CreateExternalTable { ref name, .. } => {
write!(f, "CreateExternalTable: {:?}", name)
}
+ LogicalPlan::CreateMemoryTable { ref name, .. } => {
+ write!(f, "CreateMemoryTable: {:?}", name)
+ }
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 0e97663..9f2f2af 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -205,6 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
+ | LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index 7249160..5299b9a 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -70,6 +70,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 78ea278..a30523d 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -436,6 +436,7 @@ fn optimize_plan(
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 8ce0d3a..c94d48c 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -222,6 +222,12 @@ pub fn from_plan(
n: *n,
input: Arc::new(inputs[0].clone()),
}),
+ LogicalPlan::CreateMemoryTable { name, .. } => {
+ Ok(LogicalPlan::CreateMemoryTable {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ })
+ }
LogicalPlan::Extension { node } => Ok(LogicalPlan::Extension {
node: node.from_template(expr, inputs),
}),
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 12e7401..e1170ed 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -802,7 +802,7 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
- LogicalPlan::CreateExternalTable { .. } => {
+ LogicalPlan::CreateExternalTable { .. }=> {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
@@ -811,6 +811,13 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
+ | LogicalPlan::CreateMemoryTable {..} => {
+ // Create a dummy exec.
+ Ok(Arc::new(EmptyExec::new(
+ false,
+ SchemaRef::new(Schema::empty()),
+ )))
+ }
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index de09632..50875a8 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -48,9 +48,9 @@ use arrow::datatypes::*;
use hashbrown::HashMap;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
- Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem,
- SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
- TrimWhereField, UnaryOperator, Value, Values as SQLValues,
+ HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
+ Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
+ TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
@@ -133,6 +133,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} => self.explain_statement_to_plan(*verbose, *analyze, statement),
Statement::Query(query) => self.query_to_plan(query),
Statement::ShowVariable { variable } => self.show_variable_to_plan(variable),
+ Statement::CreateTable {
+ query: Some(query),
+ name,
+ or_replace: false,
+ columns,
+ constraints,
+ hive_distribution: HiveDistributionStyle::NONE,
+ hive_formats: _hive_formats,
+ table_properties,
+ with_options,
+ file_format: None,
+ location: None,
+ like: None,
+ temporary: _temporary,
+ external: false,
+ if_not_exists: false,
+ without_rowid: _without_row_id,
+ } if columns.is_empty()
+ && constraints.is_empty()
+ && table_properties.is_empty()
+ && with_options.is_empty() =>
+ {
+ let plan = self.query_to_plan(query)?;
+
+ Ok(LogicalPlan::CreateMemoryTable {
+ name: name.to_string(),
+ input: Arc::new(plan),
+ })
+ }
+
Statement::ShowColumns {
extended,
full,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 996908a..e6064af 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -688,6 +688,30 @@ async fn select_all() -> Result<()> {
}
#[tokio::test]
+async fn create_table_as() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_simple_csv(&mut ctx).await?;
+
+ let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
+ ctx.sql(sql).await.unwrap();
+
+ let sql_all = "SELECT * FROM my_table order by c1 LIMIT 1";
+ let results_all = execute_to_batches(&mut ctx, sql_all).await;
+
+ let expected = vec![
+ "+---------+----------------+------+",
+ "| c1 | c2 | c3 |",
+ "+---------+----------------+------+",
+ "| 0.00001 | 0.000000000001 | true |",
+ "+---------+----------------+------+",
+ ];
+
+ assert_batches_eq!(expected, &results_all);
+
+ Ok(())
+}
+
+#[tokio::test]
async fn select_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;