You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/16 18:52:04 UTC
[arrow] branch master updated: ARROW-9758: [Rust] [DataFusion]
Allow physical planner to be replaced
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2c58141 ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
2c58141 is described below
commit 2c58141284c1220652a1e48390a43c4da9918848
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Aug 16 12:51:18 2020 -0600
ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
This PR mainly moves the physical query planning logic out of ExecutionContext and into its own struct. There is a new `PhysicalPlanner` trait, and it is now possible to bring-your-own planner if the one in DataFusion doesn't meet your needs (for example, if you need to implement distributed query execution).
pseudo code example:
```rust
let config = ExectionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner::new()));
let ctx = ExecutionContext::new(config);
ctx.sql("SELECT * FROM foo");
```
Closes #7975 from andygrove/physical-planner-trait
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/benchmarks/src/bin/nyctaxi.rs | 18 +-
rust/benchmarks/src/bin/tpch.rs | 18 +-
rust/datafusion/benches/aggregate_query_sql.rs | 2 +-
rust/datafusion/examples/csv_sql.rs | 8 +-
rust/datafusion/examples/dataframe.rs | 3 +-
rust/datafusion/examples/flight_server.rs | 2 +-
rust/datafusion/examples/memory_table_api.rs | 2 +-
rust/datafusion/examples/parquet_sql.rs | 7 +-
rust/datafusion/src/bin/repl.rs | 15 +-
rust/datafusion/src/dataframe.rs | 6 +-
rust/datafusion/src/execution/context.rs | 500 ++++-----------------
rust/datafusion/src/execution/dataframe_impl.rs | 4 +-
rust/datafusion/src/execution/physical_plan/mod.rs | 14 +-
.../src/execution/physical_plan/planner.rs | 467 +++++++++++++++++++
rust/datafusion/tests/sql.rs | 19 +-
15 files changed, 617 insertions(+), 468 deletions(-)
diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 3aa25d2..7468400 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -62,7 +62,9 @@ fn main() -> Result<()> {
let opt = Opt::from_args();
println!("Running benchmarks with the following options: {:?}", opt);
- let config = ExecutionConfig::new().with_concurrency(opt.concurrency);
+ let config = ExecutionConfig::new()
+ .with_concurrency(opt.concurrency)
+ .with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
let path = opt.path.to_str().unwrap();
@@ -80,13 +82,12 @@ fn main() -> Result<()> {
}
}
- datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.batch_size, opt.debug)
+ datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug)
}
fn datafusion_sql_benchmarks(
ctx: &mut ExecutionContext,
iterations: usize,
- batch_size: usize,
debug: bool,
) -> Result<()> {
let mut queries = HashMap::new();
@@ -95,7 +96,7 @@ fn datafusion_sql_benchmarks(
println!("Executing '{}'", name);
for i in 0..iterations {
let start = Instant::now();
- execute_sql(ctx, sql, batch_size, debug)?;
+ execute_sql(ctx, sql, debug)?;
println!(
"Query '{}' iteration {} took {} ms",
name,
@@ -107,18 +108,13 @@ fn datafusion_sql_benchmarks(
Ok(())
}
-fn execute_sql(
- ctx: &mut ExecutionContext,
- sql: &str,
- batch_size: usize,
- debug: bool,
-) -> Result<()> {
+fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
}
- let physical_plan = ctx.create_physical_plan(&plan, batch_size)?;
+ let physical_plan = ctx.create_physical_plan(&plan)?;
let result = ctx.collect(physical_plan.as_ref())?;
if debug {
pretty::print_batches(&result)?;
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index a8a3b96..e556dc5 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -65,9 +65,10 @@ fn main() -> Result<()> {
let opt = TpchOpt::from_args();
println!("Running benchmarks with the following options: {:?}", opt);
- let mut ctx = ExecutionContext::with_config(
- ExecutionConfig::new().with_concurrency(opt.concurrency),
- );
+ let config = ExecutionConfig::new()
+ .with_concurrency(opt.concurrency)
+ .with_batch_size(opt.batch_size);
+ let mut ctx = ExecutionContext::with_config(config);
let path = opt.path.to_str().unwrap();
@@ -118,7 +119,7 @@ fn main() -> Result<()> {
for i in 0..opt.iterations {
let start = Instant::now();
- execute_sql(&mut ctx, sql, opt.batch_size, opt.debug)?;
+ execute_sql(&mut ctx, sql, opt.debug)?;
println!(
"Query {} iteration {} took {} ms",
opt.query,
@@ -130,18 +131,13 @@ fn main() -> Result<()> {
Ok(())
}
-fn execute_sql(
- ctx: &mut ExecutionContext,
- sql: &str,
- batch_size: usize,
- debug: bool,
-) -> Result<()> {
+fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
}
- let physical_plan = ctx.create_physical_plan(&plan, batch_size)?;
+ let physical_plan = ctx.create_physical_plan(&plan)?;
let result = ctx.collect(physical_plan.as_ref())?;
if debug {
pretty::print_batches(&result)?;
diff --git a/rust/datafusion/benches/aggregate_query_sql.rs b/rust/datafusion/benches/aggregate_query_sql.rs
index ab374a0..b42e7fc 100644
--- a/rust/datafusion/benches/aggregate_query_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -32,7 +32,7 @@ use datafusion::execution::context::ExecutionContext;
fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
// execute the query
- let results = ctx.sql(&sql, 1024 * 1024).unwrap();
+ let results = ctx.sql(&sql).unwrap();
// display the relation
for _batch in results {}
diff --git a/rust/datafusion/examples/csv_sql.rs b/rust/datafusion/examples/csv_sql.rs
index df17d44..a5f3837 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -37,8 +37,12 @@ fn main() -> Result<()> {
)?;
// execute the query
- let batch_size = 4096;
- let results = ctx.sql("SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1", batch_size)?;
+ let results = ctx.sql(
+ "SELECT c1, MIN(c12), MAX(c12) \
+ FROM aggregate_test_100 \
+ WHERE c11 > 0.1 AND c11 < 0.9 \
+ GROUP BY c1",
+ )?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/dataframe.rs b/rust/datafusion/examples/dataframe.rs
index df9a09b..4b931b6 100644
--- a/rust/datafusion/examples/dataframe.rs
+++ b/rust/datafusion/examples/dataframe.rs
@@ -38,8 +38,7 @@ fn main() -> Result<()> {
.filter(col("tinyint_col").lt(col("tinyint_col")))?;
// execute the query
- let batch_size = 4096;
- let results = df.collect(batch_size)?;
+ let results = df.collect()?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs
index a3eff83..c71a758 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -99,7 +99,7 @@ impl FlightService for FlightServiceImpl {
let plan = ctx
.create_logical_plan(&sql)
.and_then(|plan| ctx.optimize(&plan))
- .and_then(|plan| ctx.create_physical_plan(&plan, 1024 * 1024))
+ .and_then(|plan| ctx.create_physical_plan(&plan))
.map_err(|e| to_tonic_err(&e))?;
// execute the query
diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 9f6c057..ee85785 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -59,7 +59,7 @@ fn main() -> Result<()> {
let df = df.select_columns(vec!["a", "b"])?.filter(filter)?;
// execute
- let results = df.collect(10)?;
+ let results = df.collect()?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs
index d05cf26..f73a2ae 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/parquet_sql.rs
@@ -35,8 +35,11 @@ fn main() -> Result<()> {
)?;
// execute the query
- let batch_size = 4096;
- let results = ctx.sql("SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col", batch_size)?;
+ let results = ctx.sql(
+ "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
+ FROM alltypes_plain \
+ WHERE id > 1 AND tinyint_col < double_col",
+ )?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
index 4a8464b..74d4320 100644
--- a/rust/datafusion/src/bin/repl.rs
+++ b/rust/datafusion/src/bin/repl.rs
@@ -20,7 +20,7 @@
use arrow::util::pretty;
use clap::{crate_version, App, Arg};
use datafusion::error::Result;
-use datafusion::execution::context::ExecutionContext;
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use rustyline::Editor;
use std::env;
use std::path::Path;
@@ -60,7 +60,8 @@ pub fn main() {
.map(|size| size.parse::<usize>().unwrap())
.unwrap_or(1_048_576);
- let mut ctx = ExecutionContext::new();
+ let mut ctx =
+ ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size));
let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
@@ -75,7 +76,7 @@ pub fn main() {
Ok(ref line) if line.trim_end().ends_with(';') => {
query.push_str(line.trim_end());
rl.add_history_entry(query.clone());
- match exec_and_print(&mut ctx, query, batch_size) {
+ match exec_and_print(&mut ctx, query) {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
@@ -99,14 +100,10 @@ fn is_exit_command(line: &str) -> bool {
line == "quit" || line == "exit"
}
-fn exec_and_print(
- ctx: &mut ExecutionContext,
- sql: String,
- batch_size: usize,
-) -> Result<()> {
+fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
let now = Instant::now();
- let results = ctx.sql(&sql, batch_size)?;
+ let results = ctx.sql(&sql)?;
if results.is_empty() {
println!(
diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs
index 645dea1..08d455e 100644
--- a/rust/datafusion/src/dataframe.rs
+++ b/rust/datafusion/src/dataframe.rs
@@ -44,7 +44,7 @@ use std::sync::Arc;
/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
/// .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
/// .limit(100).unwrap();
-/// let results = df.collect(4096);
+/// let results = df.collect();
/// ```
pub trait DataFrame {
/// Filter the DataFrame by column. Returns a new DataFrame only containing the
@@ -130,9 +130,9 @@ pub trait DataFrame {
///
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
- /// let batches = df.collect(4096).unwrap();
+ /// let batches = df.collect().unwrap();
/// ```
- fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>>;
+ fn collect(&self) -> Result<Vec<RecordBatch>>;
/// Returns the schema describing the output of this DataFrame in terms of columns returned,
/// where each column has a name, data type, and nullability attribute.
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index c97906c..8f92aae 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -35,27 +35,14 @@ use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
-use crate::execution::physical_plan::datasource::DatasourceExec;
-use crate::execution::physical_plan::explain::ExplainExec;
-use crate::execution::physical_plan::expressions::{
- Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
-};
-use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
-use crate::execution::physical_plan::limit::GlobalLimitExec;
-use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::csv::CsvReadOptions;
use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::parquet::ParquetExec;
-use crate::execution::physical_plan::projection::ProjectionExec;
+use crate::execution::physical_plan::planner::PhysicalPlannerImpl;
use crate::execution::physical_plan::scalar_functions;
-use crate::execution::physical_plan::selection::SelectionExec;
-use crate::execution::physical_plan::sort::{SortExec, SortOptions};
-use crate::execution::physical_plan::udf::{ScalarFunction, ScalarFunctionExpr};
-use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
-use crate::logicalplan::{
- Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder, PlanType,
- StringifiedPlan,
-};
+use crate::execution::physical_plan::udf::ScalarFunction;
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::PhysicalPlanner;
+use crate::logicalplan::{FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::type_coercion::TypeCoercionRule;
@@ -85,7 +72,7 @@ use crate::sql::{
/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
/// .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
/// .limit(100).unwrap();
-/// let results = df.collect(4096);
+/// let results = df.collect();
/// ```
///
/// The following example demonstrates how to execute the same query using SQL:
@@ -97,20 +84,11 @@ use crate::sql::{
///
/// let mut ctx = ExecutionContext::new();
/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
-/// let batch_size = 4096;
-/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100", batch_size).unwrap();
+/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").unwrap();
/// ```
pub struct ExecutionContext {
- state: Arc<Mutex<ExecutionContextState>>,
-}
-
-fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
- match value {
- (Ok(e), Ok(e1)) => Ok((e, e1)),
- (Err(e), Ok(_)) => Err(e),
- (Ok(_), Err(e1)) => Err(e1),
- (Err(e), Err(_)) => Err(e),
- }
+ /// Internal state for the context
+ pub state: Arc<Mutex<ExecutionContextState>>,
}
impl ExecutionContext {
@@ -139,22 +117,26 @@ impl ExecutionContext {
Self { state }
}
+ /// Get the configuration of this execution context
+ pub fn config(&self) -> ExecutionConfig {
+ self.state
+ .lock()
+ .expect("failed to lock mutex")
+ .config
+ .clone()
+ }
+
/// Execute a SQL query and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances)
- pub fn sql(&mut self, sql: &str, batch_size: usize) -> Result<Vec<RecordBatch>> {
+ pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
let plan = self.create_logical_plan(sql)?;
-
- return self.collect_plan(&plan, batch_size);
+ return self.collect_plan(&plan);
}
/// Executes a logical plan and produce a Relation (a schema-aware iterator over a series
/// of RecordBatch instances). This function is intended for internal use and should not be
/// called directly.
- pub fn collect_plan(
- &mut self,
- plan: &LogicalPlan,
- batch_size: usize,
- ) -> Result<Vec<RecordBatch>> {
+ pub fn collect_plan(&mut self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
match plan {
LogicalPlan::CreateExternalTable {
ref schema,
@@ -185,7 +167,7 @@ impl ExecutionContext {
plan => {
let plan = self.optimize(&plan)?;
- let plan = self.create_physical_plan(&plan, batch_size)?;
+ let plan = self.create_physical_plan(&plan)?;
Ok(self.collect(plan.as_ref())?)
}
}
@@ -373,363 +355,12 @@ impl ExecutionContext {
pub fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
- batch_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
- match logical_plan {
- LogicalPlan::TableScan {
- table_name,
- projection,
- ..
- } => match self
- .state
- .lock()
- .expect("failed to lock mutex")
- .datasources
- .lock()
- .expect("failed to lock mutex")
- .get(table_name)
- {
- Some(provider) => {
- let partitions = provider.scan(projection, batch_size)?;
- if partitions.is_empty() {
- Err(ExecutionError::General(
- "Table provider returned no partitions".to_string(),
- ))
- } else {
- let schema = match projection {
- None => provider.schema().clone(),
- Some(p) => Arc::new(Schema::new(
- p.iter()
- .map(|i| provider.schema().field(*i).clone())
- .collect(),
- )),
- };
-
- let exec = DatasourceExec::new(schema, partitions.clone());
- Ok(Arc::new(exec))
- }
- }
- _ => Err(ExecutionError::General(format!(
- "No table named {}",
- table_name
- ))),
- },
- LogicalPlan::InMemoryScan {
- data,
- projection,
- projected_schema,
- ..
- } => Ok(Arc::new(MemoryExec::try_new(
- data,
- Arc::new(projected_schema.as_ref().to_owned()),
- projection.to_owned(),
- )?)),
- LogicalPlan::CsvScan {
- path,
- schema,
- has_header,
- delimiter,
- projection,
- ..
- } => Ok(Arc::new(CsvExec::try_new(
- path,
- CsvReadOptions::new()
- .schema(schema.as_ref())
- .delimiter_option(*delimiter)
- .has_header(*has_header),
- projection.to_owned(),
- batch_size,
- )?)),
- LogicalPlan::ParquetScan {
- path, projection, ..
- } => Ok(Arc::new(ParquetExec::try_new(
- path,
- projection.to_owned(),
- batch_size,
- )?)),
- LogicalPlan::Projection { input, expr, .. } => {
- let input = self.create_physical_plan(input, batch_size)?;
- let input_schema = input.as_ref().schema().clone();
- let runtime_expr = expr
- .iter()
- .map(|e| {
- tuple_err((
- self.create_physical_expr(e, &input_schema),
- e.name(&input_schema),
- ))
- })
- .collect::<Result<Vec<_>>>()?;
- Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
- }
- LogicalPlan::Aggregate {
- input,
- group_expr,
- aggr_expr,
- ..
- } => {
- // Initially need to perform the aggregate and then merge the partitions
- let input = self.create_physical_plan(input, batch_size)?;
- let input_schema = input.as_ref().schema().clone();
-
- let groups = group_expr
- .iter()
- .map(|e| {
- tuple_err((
- self.create_physical_expr(e, &input_schema),
- e.name(&input_schema),
- ))
- })
- .collect::<Result<Vec<_>>>()?;
- let aggregates = aggr_expr
- .iter()
- .map(|e| {
- tuple_err((
- self.create_aggregate_expr(e, &input_schema),
- e.name(&input_schema),
- ))
- })
- .collect::<Result<Vec<_>>>()?;
-
- let initial_aggr = HashAggregateExec::try_new(
- groups.clone(),
- aggregates.clone(),
- input,
- )?;
-
- let schema = initial_aggr.schema();
- let partitions = initial_aggr.partitions()?;
-
- if partitions.len() == 1 {
- return Ok(Arc::new(initial_aggr));
- }
-
- let merge = Arc::new(MergeExec::new(
- schema.clone(),
- partitions,
- self.state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
- ));
-
- // construct the expressions for the final aggregation
- let (final_group, final_aggr) = initial_aggr.make_final_expr(
- groups.iter().map(|x| x.1.clone()).collect(),
- aggregates.iter().map(|x| x.1.clone()).collect(),
- );
-
- // construct a second aggregation, keeping the final column name equal to the first aggregation
- // and the expressions corresponding to the respective aggregate
- Ok(Arc::new(HashAggregateExec::try_new(
- final_group
- .iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
- .collect(),
- final_aggr
- .iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
- .collect(),
- merge,
- )?))
- }
- LogicalPlan::Selection { input, expr, .. } => {
- let input = self.create_physical_plan(input, batch_size)?;
- let input_schema = input.as_ref().schema().clone();
- let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
- Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
- }
- LogicalPlan::Sort { expr, input, .. } => {
- let input = self.create_physical_plan(input, batch_size)?;
- let input_schema = input.as_ref().schema().clone();
-
- let sort_expr = expr
- .iter()
- .map(|e| match e {
- Expr::Sort {
- expr,
- asc,
- nulls_first,
- } => self.create_physical_sort_expr(
- expr,
- &input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
- ),
- _ => Err(ExecutionError::ExecutionError(
- "Sort only accepts sort expressions".to_string(),
- )),
- })
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(SortExec::try_new(
- sort_expr,
- input,
- self.state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
- )?))
- }
- LogicalPlan::Limit { input, n, .. } => {
- let input = self.create_physical_plan(input, batch_size)?;
- let input_schema = input.as_ref().schema().clone();
-
- Ok(Arc::new(GlobalLimitExec::new(
- input_schema.clone(),
- input.partitions()?,
- *n,
- self.state
- .lock()
- .expect("failed to lock mutex")
- .config
- .concurrency,
- )))
- }
- LogicalPlan::Explain {
- verbose,
- plan,
- stringified_plans,
- schema,
- } => {
- let input = self.create_physical_plan(plan, batch_size)?;
-
- let mut stringified_plans = stringified_plans
- .iter()
- .filter(|s| s.should_display(*verbose))
- .map(|s| s.clone())
- .collect::<Vec<_>>();
-
- // add in the physical plan if requested
- if *verbose {
- stringified_plans.push(StringifiedPlan::new(
- PlanType::PhysicalPlan,
- format!("{:#?}", input),
- ));
- }
- let schema_ref = Arc::new((**schema).clone());
- Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
- }
- _ => Err(ExecutionError::General(
- "Unsupported logical plan variant".to_string(),
- )),
- }
- }
-
- /// Create a physical expression from a logical expression
- pub fn create_physical_expr(
- &self,
- e: &Expr,
- input_schema: &Schema,
- ) -> Result<Arc<dyn PhysicalExpr>> {
- match e {
- Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
- Expr::Column(name) => {
- // check that name exists
- input_schema.field_with_name(&name)?;
- Ok(Arc::new(Column::new(name)))
- }
- Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
- Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
- self.create_physical_expr(left, input_schema)?,
- op.clone(),
- self.create_physical_expr(right, input_schema)?,
- ))),
- Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
- self.create_physical_expr(expr, input_schema)?,
- input_schema,
- data_type.clone(),
- )?)),
- Expr::ScalarFunction {
- name,
- args,
- return_type,
- } => match &self
- .state
- .lock()
- .expect("failed to lock mutex")
- .scalar_functions
- .lock()
- .expect("failed to lock mutex")
- .get(name)
- {
- Some(f) => {
- let mut physical_args = vec![];
- for e in args {
- physical_args.push(self.create_physical_expr(e, input_schema)?);
- }
- Ok(Arc::new(ScalarFunctionExpr::new(
- name,
- Box::new(f.fun.clone()),
- physical_args,
- return_type,
- )))
- }
- _ => Err(ExecutionError::General(format!(
- "Invalid scalar function '{:?}'",
- name
- ))),
- },
- other => Err(ExecutionError::NotImplemented(format!(
- "Physical plan does not support logical expression {:?}",
- other
- ))),
- }
- }
-
- /// Create an aggregate expression from a logical expression
- pub fn create_aggregate_expr(
- &self,
- e: &Expr,
- input_schema: &Schema,
- ) -> Result<Arc<dyn AggregateExpr>> {
- match e {
- Expr::AggregateFunction { name, args, .. } => {
- match name.to_lowercase().as_ref() {
- "sum" => Ok(Arc::new(Sum::new(
- self.create_physical_expr(&args[0], input_schema)?,
- ))),
- "avg" => Ok(Arc::new(Avg::new(
- self.create_physical_expr(&args[0], input_schema)?,
- ))),
- "max" => Ok(Arc::new(Max::new(
- self.create_physical_expr(&args[0], input_schema)?,
- ))),
- "min" => Ok(Arc::new(Min::new(
- self.create_physical_expr(&args[0], input_schema)?,
- ))),
- "count" => Ok(Arc::new(Count::new(
- self.create_physical_expr(&args[0], input_schema)?,
- ))),
- other => Err(ExecutionError::NotImplemented(format!(
- "Unsupported aggregate function '{}'",
- other
- ))),
- }
- }
- other => Err(ExecutionError::General(format!(
- "Invalid aggregate expression '{:?}'",
- other
- ))),
- }
- }
-
- /// Create an aggregate expression from a logical expression
- pub fn create_physical_sort_expr(
- &self,
- e: &Expr,
- input_schema: &Schema,
- options: SortOptions,
- ) -> Result<PhysicalSortExpr> {
- Ok(PhysicalSortExpr {
- expr: self.create_physical_expr(e, input_schema)?,
- options: options,
- })
+ let planner: Arc<dyn PhysicalPlanner> = match self.config().physical_planner {
+ Some(planner) => planner,
+ None => Arc::new(PhysicalPlannerImpl::default()),
+ };
+ planner.create_physical_plan(logical_plan, self.state.clone())
}
/// Execute a physical plan and collect the results in memory
@@ -811,10 +442,14 @@ impl ExecutionContext {
}
/// Configuration options for execution context
-#[derive(Copy, Clone)]
+#[derive(Clone)]
pub struct ExecutionConfig {
/// Number of concurrent threads for query execution.
- concurrency: usize,
+ pub concurrency: usize,
+ /// Default batch size when reading data sources
+ pub batch_size: usize,
+ /// Optional physical planner to override the default physical planner
+ physical_planner: Option<Arc<dyn PhysicalPlanner>>,
}
impl ExecutionConfig {
@@ -822,6 +457,8 @@ impl ExecutionConfig {
pub fn new() -> Self {
Self {
concurrency: num_cpus::get(),
+ batch_size: 4096,
+ physical_planner: None,
}
}
@@ -832,14 +469,34 @@ impl ExecutionConfig {
self.concurrency = n;
self
}
+
+ /// Customize batch size
+ pub fn with_batch_size(mut self, n: usize) -> Self {
+ // batch size must be greater than zero
+ assert!(n > 0);
+ self.batch_size = n;
+ self
+ }
+
+ /// Optional physical planner to override the default physical planner
+ pub fn with_physical_planner(
+ mut self,
+ physical_planner: Arc<dyn PhysicalPlanner>,
+ ) -> Self {
+ self.physical_planner = Some(physical_planner);
+ self
+ }
}
/// Execution context for registering data sources and executing queries
#[derive(Clone)]
pub struct ExecutionContextState {
- datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
- scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
- config: ExecutionConfig,
+ /// Data sources that are registered with the context
+ pub datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
+ /// Scalar functions that are registered with the context
+ pub scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+ /// Context configuration
+ pub config: ExecutionConfig,
}
impl SchemaProvider for ExecutionContextState {
@@ -911,7 +568,7 @@ mod tests {
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan)?;
let results = ctx.collect(physical_plan.as_ref())?;
@@ -955,7 +612,7 @@ mod tests {
\n TableScan: test projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);
- let physical_plan = ctx.create_physical_plan(&optimized_plan, 1024)?;
+ let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -990,7 +647,7 @@ mod tests {
.build()?;
let plan = ctx.optimize(&plan)?;
- let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
assert_eq!(
physical_plan.schema().field_with_name("c1")?.is_nullable(),
false
@@ -1043,7 +700,7 @@ mod tests {
\n InMemoryScan: projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);
- let physical_plan = ctx.create_physical_plan(&optimized_plan, 1024)?;
+ let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());
@@ -1277,7 +934,7 @@ mod tests {
let plan = ctx.optimize(&plan)?;
- let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+ let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
@@ -1418,7 +1075,7 @@ mod tests {
);
let plan = ctx.optimize(&plan)?;
- let plan = ctx.create_physical_plan(&plan, 1024)?;
+ let plan = ctx.create_physical_plan(&plan)?;
let result = ctx.collect(plan.as_ref())?;
let batch = &result[0];
@@ -1452,11 +1109,34 @@ mod tests {
Ok(())
}
+ #[test]
+ fn custom_physical_planner() -> Result<()> {
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
+ );
+ ctx.sql("SELECT 1").expect_err("query not supported");
+ Ok(())
+ }
+
+ struct MyPhysicalPlanner {}
+
+ impl PhysicalPlanner for MyPhysicalPlanner {
+ fn create_physical_plan(
+ &self,
+ _logical_plan: &LogicalPlan,
+ _ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Err(ExecutionError::NotImplemented(
+ "query not supported".to_string(),
+ ))
+ }
+ }
+
/// Execute SQL and return results
fn collect(ctx: &mut ExecutionContext, sql: &str) -> Result<Vec<RecordBatch>> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan)?;
ctx.collect(physical_plan.as_ref())
}
@@ -1480,7 +1160,7 @@ mod tests {
fn write_csv(ctx: &mut ExecutionContext, sql: &str, out_dir: &str) -> Result<()> {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+ let physical_plan = ctx.create_physical_plan(&logical_plan)?;
ctx.write_csv(physical_plan.as_ref(), out_dir)
}
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs
index bc7afd5..491cb70 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -122,9 +122,9 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}
- fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>> {
+ fn collect(&self) -> Result<Vec<RecordBatch>> {
let mut ctx = ExecutionContext::from(self.ctx_state.clone());
- ctx.collect_plan(&self.plan.clone(), batch_size)
+ ctx.collect_plan(&self.plan.clone())
}
/// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index de76916..4b66955 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -23,7 +23,8 @@ use std::rc::Rc;
use std::sync::{Arc, Mutex};
use crate::error::Result;
-use crate::logicalplan::ScalarValue;
+use crate::execution::context::ExecutionContextState;
+use crate::logicalplan::{LogicalPlan, ScalarValue};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::{
@@ -32,6 +33,16 @@ use arrow::{
};
use udf::ScalarFunction;
+/// Physical query planner
+pub trait PhysicalPlanner {
+ /// Create a physical plan from a logical plan
+ fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<Arc<dyn ExecutionPlan>>;
+}
+
/// Partition-aware execution plan for a relation
pub trait ExecutionPlan: Debug {
/// Get the schema for this execution plan
@@ -104,6 +115,7 @@ pub mod math_expressions;
pub mod memory;
pub mod merge;
pub mod parquet;
+pub mod planner;
pub mod projection;
pub mod selection;
pub mod sort;
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
new file mode 100644
index 0000000..7ce845b
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical query planner
+
+use std::sync::{Arc, Mutex};
+
+use crate::error::{ExecutionError, Result};
+use crate::execution::context::ExecutionContextState;
+use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
+use crate::execution::physical_plan::datasource::DatasourceExec;
+use crate::execution::physical_plan::explain::ExplainExec;
+use crate::execution::physical_plan::expressions::{
+ Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
+};
+use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
+use crate::execution::physical_plan::limit::GlobalLimitExec;
+use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::merge::MergeExec;
+use crate::execution::physical_plan::parquet::ParquetExec;
+use crate::execution::physical_plan::projection::ProjectionExec;
+use crate::execution::physical_plan::selection::SelectionExec;
+use crate::execution::physical_plan::sort::SortExec;
+use crate::execution::physical_plan::udf::ScalarFunctionExpr;
+use crate::execution::physical_plan::{
+ AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner,
+};
+use crate::logicalplan::{Expr, LogicalPlan, PlanType, StringifiedPlan};
+use arrow::compute::SortOptions;
+use arrow::datatypes::Schema;
+
+/// Default physical query planner
+pub struct PhysicalPlannerImpl {}
+
+impl Default for PhysicalPlannerImpl {
+ /// Create an implementation of the physical planner
+ fn default() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalPlanner for PhysicalPlannerImpl {
+ /// Create a physical plan from a logical plan
+ fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let batch_size = ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .config
+ .batch_size;
+
+ match logical_plan {
+ LogicalPlan::TableScan {
+ table_name,
+ projection,
+ ..
+ } => match ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .datasources
+ .lock()
+ .expect("failed to lock mutex")
+ .get(table_name)
+ {
+ Some(provider) => {
+ let partitions = provider.scan(projection, batch_size)?;
+ if partitions.is_empty() {
+ Err(ExecutionError::General(
+ "Table provider returned no partitions".to_string(),
+ ))
+ } else {
+ let schema = match projection {
+ None => provider.schema().clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter()
+ .map(|i| provider.schema().field(*i).clone())
+ .collect(),
+ )),
+ };
+
+ let exec = DatasourceExec::new(schema, partitions.clone());
+ Ok(Arc::new(exec))
+ }
+ }
+ _ => Err(ExecutionError::General(format!(
+ "No table named {}",
+ table_name
+ ))),
+ },
+ LogicalPlan::InMemoryScan {
+ data,
+ projection,
+ projected_schema,
+ ..
+ } => Ok(Arc::new(MemoryExec::try_new(
+ data,
+ Arc::new(projected_schema.as_ref().to_owned()),
+ projection.to_owned(),
+ )?)),
+ LogicalPlan::CsvScan {
+ path,
+ schema,
+ has_header,
+ delimiter,
+ projection,
+ ..
+ } => Ok(Arc::new(CsvExec::try_new(
+ path,
+ CsvReadOptions::new()
+ .schema(schema.as_ref())
+ .delimiter_option(*delimiter)
+ .has_header(*has_header),
+ projection.to_owned(),
+ batch_size,
+ )?)),
+ LogicalPlan::ParquetScan {
+ path, projection, ..
+ } => Ok(Arc::new(ParquetExec::try_new(
+ path,
+ projection.to_owned(),
+ batch_size,
+ )?)),
+ LogicalPlan::Projection { input, expr, .. } => {
+ let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input_schema = input.as_ref().schema().clone();
+ let runtime_expr = expr
+ .iter()
+ .map(|e| {
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ &input_schema,
+ ctx_state.clone(),
+ ),
+ e.name(&input_schema),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
+ }
+ LogicalPlan::Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ } => {
+ // Initially need to perform the aggregate and then merge the partitions
+ let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input_schema = input.as_ref().schema().clone();
+
+ let groups = group_expr
+ .iter()
+ .map(|e| {
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ &input_schema,
+ ctx_state.clone(),
+ ),
+ e.name(&input_schema),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let aggregates = aggr_expr
+ .iter()
+ .map(|e| {
+ tuple_err((
+ self.create_aggregate_expr(
+ e,
+ &input_schema,
+ ctx_state.clone(),
+ ),
+ e.name(&input_schema),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let initial_aggr = HashAggregateExec::try_new(
+ groups.clone(),
+ aggregates.clone(),
+ input,
+ )?;
+
+ let schema = initial_aggr.schema();
+ let partitions = initial_aggr.partitions()?;
+
+ if partitions.len() == 1 {
+ return Ok(Arc::new(initial_aggr));
+ }
+
+ let merge = Arc::new(MergeExec::new(
+ schema.clone(),
+ partitions,
+ ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .config
+ .concurrency,
+ ));
+
+ // construct the expressions for the final aggregation
+ let (final_group, final_aggr) = initial_aggr.make_final_expr(
+ groups.iter().map(|x| x.1.clone()).collect(),
+ aggregates.iter().map(|x| x.1.clone()).collect(),
+ );
+
+ // construct a second aggregation, keeping the final column name equal to the first aggregation
+ // and the expressions corresponding to the respective aggregate
+ Ok(Arc::new(HashAggregateExec::try_new(
+ final_group
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+ .collect(),
+ final_aggr
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
+ .collect(),
+ merge,
+ )?))
+ }
+ LogicalPlan::Selection { input, expr, .. } => {
+ let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input_schema = input.as_ref().schema().clone();
+ let runtime_expr =
+ self.create_physical_expr(expr, &input_schema, ctx_state.clone())?;
+ Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
+ }
+ LogicalPlan::Sort { expr, input, .. } => {
+ let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input_schema = input.as_ref().schema().clone();
+
+ let sort_expr = expr
+ .iter()
+ .map(|e| match e {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => self.create_physical_sort_expr(
+ expr,
+ &input_schema,
+ SortOptions {
+ descending: !*asc,
+ nulls_first: *nulls_first,
+ },
+ ctx_state.clone(),
+ ),
+ _ => Err(ExecutionError::ExecutionError(
+ "Sort only accepts sort expressions".to_string(),
+ )),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Arc::new(SortExec::try_new(
+ sort_expr,
+ input,
+ ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .config
+ .concurrency,
+ )?))
+ }
+ LogicalPlan::Limit { input, n, .. } => {
+ let input = self.create_physical_plan(input, ctx_state.clone())?;
+ let input_schema = input.as_ref().schema().clone();
+
+ Ok(Arc::new(GlobalLimitExec::new(
+ input_schema.clone(),
+ input.partitions()?,
+ *n,
+ ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .config
+ .concurrency,
+ )))
+ }
+ LogicalPlan::Explain {
+ verbose,
+ plan,
+ stringified_plans,
+ schema,
+ } => {
+ let input = self.create_physical_plan(plan, ctx_state.clone())?;
+
+ let mut stringified_plans = stringified_plans
+ .iter()
+ .filter(|s| s.should_display(*verbose))
+ .map(|s| s.clone())
+ .collect::<Vec<_>>();
+
+ // add in the physical plan if requested
+ if *verbose {
+ stringified_plans.push(StringifiedPlan::new(
+ PlanType::PhysicalPlan,
+ format!("{:#?}", input),
+ ));
+ }
+ let schema_ref = Arc::new((**schema).clone());
+ Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
+ }
+ _ => Err(ExecutionError::General(
+ "Unsupported logical plan variant".to_string(),
+ )),
+ }
+ }
+}
+
+impl PhysicalPlannerImpl {
+ /// Create a physical expression from a logical expression
+ pub fn create_physical_expr(
+ &self,
+ e: &Expr,
+ input_schema: &Schema,
+ ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ match e {
+ Expr::Alias(expr, ..) => {
+ Ok(self.create_physical_expr(expr, input_schema, ctx_state.clone())?)
+ }
+ Expr::Column(name) => {
+ // check that name exists
+ input_schema.field_with_name(&name)?;
+ Ok(Arc::new(Column::new(name)))
+ }
+ Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
+ Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
+ self.create_physical_expr(left, input_schema, ctx_state.clone())?,
+ op.clone(),
+ self.create_physical_expr(right, input_schema, ctx_state.clone())?,
+ ))),
+ Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
+ self.create_physical_expr(expr, input_schema, ctx_state.clone())?,
+ input_schema,
+ data_type.clone(),
+ )?)),
+ Expr::ScalarFunction {
+ name,
+ args,
+ return_type,
+ } => match ctx_state
+ .lock()
+ .expect("failed to lock mutex")
+ .scalar_functions
+ .lock()
+ .expect("failed to lock mutex")
+ .get(name)
+ {
+ Some(f) => {
+ let mut physical_args = vec![];
+ for e in args {
+ physical_args.push(self.create_physical_expr(
+ e,
+ input_schema,
+ ctx_state.clone(),
+ )?);
+ }
+ Ok(Arc::new(ScalarFunctionExpr::new(
+ name,
+ Box::new(f.fun.clone()),
+ physical_args,
+ return_type,
+ )))
+ }
+ _ => Err(ExecutionError::General(format!(
+ "Invalid scalar function '{:?}'",
+ name
+ ))),
+ },
+ other => Err(ExecutionError::NotImplemented(format!(
+ "Physical plan does not support logical expression {:?}",
+ other
+ ))),
+ }
+ }
+
+ /// Create an aggregate expression from a logical expression
+ pub fn create_aggregate_expr(
+ &self,
+ e: &Expr,
+ input_schema: &Schema,
+ ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<Arc<dyn AggregateExpr>> {
+ match e {
+ Expr::AggregateFunction { name, args, .. } => {
+ match name.to_lowercase().as_ref() {
+ "sum" => Ok(Arc::new(Sum::new(self.create_physical_expr(
+ &args[0],
+ input_schema,
+ ctx_state.clone(),
+ )?))),
+ "avg" => Ok(Arc::new(Avg::new(self.create_physical_expr(
+ &args[0],
+ input_schema,
+ ctx_state.clone(),
+ )?))),
+ "max" => Ok(Arc::new(Max::new(self.create_physical_expr(
+ &args[0],
+ input_schema,
+ ctx_state.clone(),
+ )?))),
+ "min" => Ok(Arc::new(Min::new(self.create_physical_expr(
+ &args[0],
+ input_schema,
+ ctx_state.clone(),
+ )?))),
+ "count" => Ok(Arc::new(Count::new(self.create_physical_expr(
+ &args[0],
+ input_schema,
+ ctx_state.clone(),
+ )?))),
+ other => Err(ExecutionError::NotImplemented(format!(
+ "Unsupported aggregate function '{}'",
+ other
+ ))),
+ }
+ }
+ other => Err(ExecutionError::General(format!(
+ "Invalid aggregate expression '{:?}'",
+ other
+ ))),
+ }
+ }
+
+ /// Create an aggregate expression from a logical expression
+ pub fn create_physical_sort_expr(
+ &self,
+ e: &Expr,
+ input_schema: &Schema,
+ options: SortOptions,
+ ctx_state: Arc<Mutex<ExecutionContextState>>,
+ ) -> Result<PhysicalSortExpr> {
+ Ok(PhysicalSortExpr {
+ expr: self.create_physical_expr(e, input_schema, ctx_state.clone())?,
+ options: options,
+ })
+ }
+}
+
+fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
+ match value {
+ (Ok(e), Ok(e1)) => Ok((e, e1)),
+ (Err(e), Ok(_)) => Err(e),
+ (Ok(_), Err(e1)) => Err(e1),
+ (Err(e), Err(_)) => Err(e),
+ }
+}
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index a258d3d..d2d5349 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -31,8 +31,6 @@ use datafusion::execution::context::ExecutionContext;
use datafusion::execution::physical_plan::udf::ScalarFunction;
use datafusion::logicalplan::LogicalPlan;
-const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
-
#[test]
fn nyc() -> Result<()> {
// schema for nyxtaxi csv files
@@ -111,7 +109,7 @@ fn parquet_single_nan_schema() {
let sql = "SELECT mycol FROM single_nan";
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+ let plan = ctx.create_physical_plan(&plan).unwrap();
let results = ctx.collect(plan.as_ref()).unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
@@ -277,7 +275,7 @@ fn csv_query_avg_multi_batch() -> Result<()> {
let sql = "SELECT avg(c12) FROM aggregate_test_100";
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+ let plan = ctx.create_physical_plan(&plan).unwrap();
let results = ctx.collect(plan.as_ref()).unwrap();
let batch = &results[0];
let column = batch.column(0);
@@ -489,9 +487,8 @@ fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
// TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
// unsigned is supported.
- ctx.sql(
- &format!(
- "
+ ctx.sql(&format!(
+ "
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 INT NOT NULL,
@@ -511,10 +508,8 @@ fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
WITH HEADER ROW
LOCATION '{}/csv/aggregate_test_100.csv'
",
- testdata
- ),
- 1024,
- )
+ testdata
+ ))
.unwrap();
}
@@ -542,7 +537,7 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<String> {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
- let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+ let plan = ctx.create_physical_plan(&plan).unwrap();
let results = ctx.collect(plan.as_ref()).unwrap();
result_str(&results)
}