You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/16 18:52:04 UTC

[arrow] branch master updated: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c58141  ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
2c58141 is described below

commit 2c58141284c1220652a1e48390a43c4da9918848
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Aug 16 12:51:18 2020 -0600

    ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced
    
    This PR mainly moves the physical query planning logic out of ExecutionContext and into its own struct. There is a new `PhysicalPlanner` trait, and it is now possible to bring-your-own planner if the one in DataFusion doesn't meet your needs (for example, if you need to implement distributed query execution).
    
    pseudo code example:
    
    ```rust
    let config = ExectionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner::new()));
    let ctx = ExecutionContext::new(config);
    ctx.sql("SELECT * FROM foo");
    ```
    
    Closes #7975 from andygrove/physical-planner-trait
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/benchmarks/src/bin/nyctaxi.rs                 |  18 +-
 rust/benchmarks/src/bin/tpch.rs                    |  18 +-
 rust/datafusion/benches/aggregate_query_sql.rs     |   2 +-
 rust/datafusion/examples/csv_sql.rs                |   8 +-
 rust/datafusion/examples/dataframe.rs              |   3 +-
 rust/datafusion/examples/flight_server.rs          |   2 +-
 rust/datafusion/examples/memory_table_api.rs       |   2 +-
 rust/datafusion/examples/parquet_sql.rs            |   7 +-
 rust/datafusion/src/bin/repl.rs                    |  15 +-
 rust/datafusion/src/dataframe.rs                   |   6 +-
 rust/datafusion/src/execution/context.rs           | 500 ++++-----------------
 rust/datafusion/src/execution/dataframe_impl.rs    |   4 +-
 rust/datafusion/src/execution/physical_plan/mod.rs |  14 +-
 .../src/execution/physical_plan/planner.rs         | 467 +++++++++++++++++++
 rust/datafusion/tests/sql.rs                       |  19 +-
 15 files changed, 617 insertions(+), 468 deletions(-)

diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 3aa25d2..7468400 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -62,7 +62,9 @@ fn main() -> Result<()> {
     let opt = Opt::from_args();
     println!("Running benchmarks with the following options: {:?}", opt);
 
-    let config = ExecutionConfig::new().with_concurrency(opt.concurrency);
+    let config = ExecutionConfig::new()
+        .with_concurrency(opt.concurrency)
+        .with_batch_size(opt.batch_size);
     let mut ctx = ExecutionContext::with_config(config);
 
     let path = opt.path.to_str().unwrap();
@@ -80,13 +82,12 @@ fn main() -> Result<()> {
         }
     }
 
-    datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.batch_size, opt.debug)
+    datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug)
 }
 
 fn datafusion_sql_benchmarks(
     ctx: &mut ExecutionContext,
     iterations: usize,
-    batch_size: usize,
     debug: bool,
 ) -> Result<()> {
     let mut queries = HashMap::new();
@@ -95,7 +96,7 @@ fn datafusion_sql_benchmarks(
         println!("Executing '{}'", name);
         for i in 0..iterations {
             let start = Instant::now();
-            execute_sql(ctx, sql, batch_size, debug)?;
+            execute_sql(ctx, sql, debug)?;
             println!(
                 "Query '{}' iteration {} took {} ms",
                 name,
@@ -107,18 +108,13 @@ fn datafusion_sql_benchmarks(
     Ok(())
 }
 
-fn execute_sql(
-    ctx: &mut ExecutionContext,
-    sql: &str,
-    batch_size: usize,
-    debug: bool,
-) -> Result<()> {
+fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
     let plan = ctx.create_logical_plan(sql)?;
     let plan = ctx.optimize(&plan)?;
     if debug {
         println!("Optimized logical plan:\n{:?}", plan);
     }
-    let physical_plan = ctx.create_physical_plan(&plan, batch_size)?;
+    let physical_plan = ctx.create_physical_plan(&plan)?;
     let result = ctx.collect(physical_plan.as_ref())?;
     if debug {
         pretty::print_batches(&result)?;
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index a8a3b96..e556dc5 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -65,9 +65,10 @@ fn main() -> Result<()> {
     let opt = TpchOpt::from_args();
     println!("Running benchmarks with the following options: {:?}", opt);
 
-    let mut ctx = ExecutionContext::with_config(
-        ExecutionConfig::new().with_concurrency(opt.concurrency),
-    );
+    let config = ExecutionConfig::new()
+        .with_concurrency(opt.concurrency)
+        .with_batch_size(opt.batch_size);
+    let mut ctx = ExecutionContext::with_config(config);
 
     let path = opt.path.to_str().unwrap();
 
@@ -118,7 +119,7 @@ fn main() -> Result<()> {
 
     for i in 0..opt.iterations {
         let start = Instant::now();
-        execute_sql(&mut ctx, sql, opt.batch_size, opt.debug)?;
+        execute_sql(&mut ctx, sql, opt.debug)?;
         println!(
             "Query {} iteration {} took {} ms",
             opt.query,
@@ -130,18 +131,13 @@ fn main() -> Result<()> {
     Ok(())
 }
 
-fn execute_sql(
-    ctx: &mut ExecutionContext,
-    sql: &str,
-    batch_size: usize,
-    debug: bool,
-) -> Result<()> {
+fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
     let plan = ctx.create_logical_plan(sql)?;
     let plan = ctx.optimize(&plan)?;
     if debug {
         println!("Optimized logical plan:\n{:?}", plan);
     }
-    let physical_plan = ctx.create_physical_plan(&plan, batch_size)?;
+    let physical_plan = ctx.create_physical_plan(&plan)?;
     let result = ctx.collect(physical_plan.as_ref())?;
     if debug {
         pretty::print_batches(&result)?;
diff --git a/rust/datafusion/benches/aggregate_query_sql.rs b/rust/datafusion/benches/aggregate_query_sql.rs
index ab374a0..b42e7fc 100644
--- a/rust/datafusion/benches/aggregate_query_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -32,7 +32,7 @@ use datafusion::execution::context::ExecutionContext;
 
 fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     // execute the query
-    let results = ctx.sql(&sql, 1024 * 1024).unwrap();
+    let results = ctx.sql(&sql).unwrap();
 
     // display the relation
     for _batch in results {}
diff --git a/rust/datafusion/examples/csv_sql.rs b/rust/datafusion/examples/csv_sql.rs
index df17d44..a5f3837 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -37,8 +37,12 @@ fn main() -> Result<()> {
     )?;
 
     // execute the query
-    let batch_size = 4096;
-    let results = ctx.sql("SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1", batch_size)?;
+    let results = ctx.sql(
+        "SELECT c1, MIN(c12), MAX(c12) \
+        FROM aggregate_test_100 \
+        WHERE c11 > 0.1 AND c11 < 0.9 \
+        GROUP BY c1",
+    )?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/dataframe.rs b/rust/datafusion/examples/dataframe.rs
index df9a09b..4b931b6 100644
--- a/rust/datafusion/examples/dataframe.rs
+++ b/rust/datafusion/examples/dataframe.rs
@@ -38,8 +38,7 @@ fn main() -> Result<()> {
         .filter(col("tinyint_col").lt(col("tinyint_col")))?;
 
     // execute the query
-    let batch_size = 4096;
-    let results = df.collect(batch_size)?;
+    let results = df.collect()?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs
index a3eff83..c71a758 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -99,7 +99,7 @@ impl FlightService for FlightServiceImpl {
                 let plan = ctx
                     .create_logical_plan(&sql)
                     .and_then(|plan| ctx.optimize(&plan))
-                    .and_then(|plan| ctx.create_physical_plan(&plan, 1024 * 1024))
+                    .and_then(|plan| ctx.create_physical_plan(&plan))
                     .map_err(|e| to_tonic_err(&e))?;
 
                 // execute the query
diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs
index 9f6c057..ee85785 100644
--- a/rust/datafusion/examples/memory_table_api.rs
+++ b/rust/datafusion/examples/memory_table_api.rs
@@ -59,7 +59,7 @@ fn main() -> Result<()> {
     let df = df.select_columns(vec!["a", "b"])?.filter(filter)?;
 
     // execute
-    let results = df.collect(10)?;
+    let results = df.collect()?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs
index d05cf26..f73a2ae 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/parquet_sql.rs
@@ -35,8 +35,11 @@ fn main() -> Result<()> {
     )?;
 
     // execute the query
-    let batch_size = 4096;
-    let results = ctx.sql("SELECT int_col, double_col, CAST(date_string_col as VARCHAR) FROM alltypes_plain WHERE id > 1 AND tinyint_col < double_col", batch_size)?;
+    let results = ctx.sql(
+        "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
+        FROM alltypes_plain \
+        WHERE id > 1 AND tinyint_col < double_col",
+    )?;
 
     // print the results
     pretty::print_batches(&results)?;
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
index 4a8464b..74d4320 100644
--- a/rust/datafusion/src/bin/repl.rs
+++ b/rust/datafusion/src/bin/repl.rs
@@ -20,7 +20,7 @@
 use arrow::util::pretty;
 use clap::{crate_version, App, Arg};
 use datafusion::error::Result;
-use datafusion::execution::context::ExecutionContext;
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 use rustyline::Editor;
 use std::env;
 use std::path::Path;
@@ -60,7 +60,8 @@ pub fn main() {
         .map(|size| size.parse::<usize>().unwrap())
         .unwrap_or(1_048_576);
 
-    let mut ctx = ExecutionContext::new();
+    let mut ctx =
+        ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size));
 
     let mut rl = Editor::<()>::new();
     rl.load_history(".history").ok();
@@ -75,7 +76,7 @@ pub fn main() {
             Ok(ref line) if line.trim_end().ends_with(';') => {
                 query.push_str(line.trim_end());
                 rl.add_history_entry(query.clone());
-                match exec_and_print(&mut ctx, query, batch_size) {
+                match exec_and_print(&mut ctx, query) {
                     Ok(_) => {}
                     Err(err) => println!("{:?}", err),
                 }
@@ -99,14 +100,10 @@ fn is_exit_command(line: &str) -> bool {
     line == "quit" || line == "exit"
 }
 
-fn exec_and_print(
-    ctx: &mut ExecutionContext,
-    sql: String,
-    batch_size: usize,
-) -> Result<()> {
+fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
     let now = Instant::now();
 
-    let results = ctx.sql(&sql, batch_size)?;
+    let results = ctx.sql(&sql)?;
 
     if results.is_empty() {
         println!(
diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs
index 645dea1..08d455e 100644
--- a/rust/datafusion/src/dataframe.rs
+++ b/rust/datafusion/src/dataframe.rs
@@ -44,7 +44,7 @@ use std::sync::Arc;
 /// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
 ///            .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
 ///            .limit(100).unwrap();
-/// let results = df.collect(4096);
+/// let results = df.collect();
 /// ```
 pub trait DataFrame {
     /// Filter the DataFrame by column. Returns a new DataFrame only containing the
@@ -130,9 +130,9 @@ pub trait DataFrame {
     ///
     /// let mut ctx = ExecutionContext::new();
     /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
-    /// let batches = df.collect(4096).unwrap();
+    /// let batches = df.collect().unwrap();
     /// ```
-    fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>>;
+    fn collect(&self) -> Result<Vec<RecordBatch>>;
 
     /// Returns the schema describing the output of this DataFrame in terms of columns returned,
     /// where each column has a name, data type, and nullability attribute.
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index c97906c..8f92aae 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -35,27 +35,14 @@ use crate::datasource::TableProvider;
 use crate::error::{ExecutionError, Result};
 use crate::execution::dataframe_impl::DataFrameImpl;
 use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
-use crate::execution::physical_plan::datasource::DatasourceExec;
-use crate::execution::physical_plan::explain::ExplainExec;
-use crate::execution::physical_plan::expressions::{
-    Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
-};
-use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
-use crate::execution::physical_plan::limit::GlobalLimitExec;
-use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::csv::CsvReadOptions;
 use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::parquet::ParquetExec;
-use crate::execution::physical_plan::projection::ProjectionExec;
+use crate::execution::physical_plan::planner::PhysicalPlannerImpl;
 use crate::execution::physical_plan::scalar_functions;
-use crate::execution::physical_plan::selection::SelectionExec;
-use crate::execution::physical_plan::sort::{SortExec, SortOptions};
-use crate::execution::physical_plan::udf::{ScalarFunction, ScalarFunctionExpr};
-use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
-use crate::logicalplan::{
-    Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder, PlanType,
-    StringifiedPlan,
-};
+use crate::execution::physical_plan::udf::ScalarFunction;
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::PhysicalPlanner;
+use crate::logicalplan::{FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
 use crate::optimizer::type_coercion::TypeCoercionRule;
@@ -85,7 +72,7 @@ use crate::sql::{
 /// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
 ///            .aggregate(vec![col("a")], vec![df.min(col("b")).unwrap()]).unwrap()
 ///            .limit(100).unwrap();
-/// let results = df.collect(4096);
+/// let results = df.collect();
 /// ```
 ///
 /// The following example demonstrates how to execute the same query using SQL:
@@ -97,20 +84,11 @@ use crate::sql::{
 ///
 /// let mut ctx = ExecutionContext::new();
 /// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
-/// let batch_size = 4096;
-/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100", batch_size).unwrap();
+/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").unwrap();
 /// ```
 pub struct ExecutionContext {
-    state: Arc<Mutex<ExecutionContextState>>,
-}
-
-fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
-    match value {
-        (Ok(e), Ok(e1)) => Ok((e, e1)),
-        (Err(e), Ok(_)) => Err(e),
-        (Ok(_), Err(e1)) => Err(e1),
-        (Err(e), Err(_)) => Err(e),
-    }
+    /// Internal state for the context
+    pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
 impl ExecutionContext {
@@ -139,22 +117,26 @@ impl ExecutionContext {
         Self { state }
     }
 
+    /// Get the configuration of this execution context
+    pub fn config(&self) -> ExecutionConfig {
+        self.state
+            .lock()
+            .expect("failed to lock mutex")
+            .config
+            .clone()
+    }
+
     /// Execute a SQL query and produce a Relation (a schema-aware iterator over a series
     /// of RecordBatch instances)
-    pub fn sql(&mut self, sql: &str, batch_size: usize) -> Result<Vec<RecordBatch>> {
+    pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
         let plan = self.create_logical_plan(sql)?;
-
-        return self.collect_plan(&plan, batch_size);
+        return self.collect_plan(&plan);
     }
 
     /// Executes a logical plan and produce a Relation (a schema-aware iterator over a series
     /// of RecordBatch instances). This function is intended for internal use and should not be
     /// called directly.
-    pub fn collect_plan(
-        &mut self,
-        plan: &LogicalPlan,
-        batch_size: usize,
-    ) -> Result<Vec<RecordBatch>> {
+    pub fn collect_plan(&mut self, plan: &LogicalPlan) -> Result<Vec<RecordBatch>> {
         match plan {
             LogicalPlan::CreateExternalTable {
                 ref schema,
@@ -185,7 +167,7 @@ impl ExecutionContext {
 
             plan => {
                 let plan = self.optimize(&plan)?;
-                let plan = self.create_physical_plan(&plan, batch_size)?;
+                let plan = self.create_physical_plan(&plan)?;
                 Ok(self.collect(plan.as_ref())?)
             }
         }
@@ -373,363 +355,12 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match logical_plan {
-            LogicalPlan::TableScan {
-                table_name,
-                projection,
-                ..
-            } => match self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .datasources
-                .lock()
-                .expect("failed to lock mutex")
-                .get(table_name)
-            {
-                Some(provider) => {
-                    let partitions = provider.scan(projection, batch_size)?;
-                    if partitions.is_empty() {
-                        Err(ExecutionError::General(
-                            "Table provider returned no partitions".to_string(),
-                        ))
-                    } else {
-                        let schema = match projection {
-                            None => provider.schema().clone(),
-                            Some(p) => Arc::new(Schema::new(
-                                p.iter()
-                                    .map(|i| provider.schema().field(*i).clone())
-                                    .collect(),
-                            )),
-                        };
-
-                        let exec = DatasourceExec::new(schema, partitions.clone());
-                        Ok(Arc::new(exec))
-                    }
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "No table named {}",
-                    table_name
-                ))),
-            },
-            LogicalPlan::InMemoryScan {
-                data,
-                projection,
-                projected_schema,
-                ..
-            } => Ok(Arc::new(MemoryExec::try_new(
-                data,
-                Arc::new(projected_schema.as_ref().to_owned()),
-                projection.to_owned(),
-            )?)),
-            LogicalPlan::CsvScan {
-                path,
-                schema,
-                has_header,
-                delimiter,
-                projection,
-                ..
-            } => Ok(Arc::new(CsvExec::try_new(
-                path,
-                CsvReadOptions::new()
-                    .schema(schema.as_ref())
-                    .delimiter_option(*delimiter)
-                    .has_header(*has_header),
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::ParquetScan {
-                path, projection, ..
-            } => Ok(Arc::new(ParquetExec::try_new(
-                path,
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the partitions
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = HashAggregateExec::try_new(
-                    groups.clone(),
-                    aggregates.clone(),
-                    input,
-                )?;
-
-                let schema = initial_aggr.schema();
-                let partitions = initial_aggr.partitions()?;
-
-                if partitions.len() == 1 {
-                    return Ok(Arc::new(initial_aggr));
-                }
-
-                let merge = Arc::new(MergeExec::new(
-                    schema.clone(),
-                    partitions,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                ));
-
-                // construct the expressions for the final aggregation
-                let (final_group, final_aggr) = initial_aggr.make_final_expr(
-                    groups.iter().map(|x| x.1.clone()).collect(),
-                    aggregates.iter().map(|x| x.1.clone()).collect(),
-                );
-
-                // construct a second aggregation, keeping the final column name equal to the first aggregation
-                // and the expressions corresponding to the respective aggregate
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    final_group
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    final_aggr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
-                        .collect(),
-                    merge,
-                )?))
-            }
-            LogicalPlan::Selection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
-                Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                        ),
-                        _ => Err(ExecutionError::ExecutionError(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(SortExec::try_new(
-                    sort_expr,
-                    input,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )?))
-            }
-            LogicalPlan::Limit { input, n, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                Ok(Arc::new(GlobalLimitExec::new(
-                    input_schema.clone(),
-                    input.partitions()?,
-                    *n,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )))
-            }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                let input = self.create_physical_plan(plan, batch_size)?;
-
-                let mut stringified_plans = stringified_plans
-                    .iter()
-                    .filter(|s| s.should_display(*verbose))
-                    .map(|s| s.clone())
-                    .collect::<Vec<_>>();
-
-                // add in the physical plan if requested
-                if *verbose {
-                    stringified_plans.push(StringifiedPlan::new(
-                        PlanType::PhysicalPlan,
-                        format!("{:#?}", input),
-                    ));
-                }
-                let schema_ref = Arc::new((**schema).clone());
-                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
-            }
-            _ => Err(ExecutionError::General(
-                "Unsupported logical plan variant".to_string(),
-            )),
-        }
-    }
-
-    /// Create a physical expression from a logical expression
-    pub fn create_physical_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        match e {
-            Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
-            Expr::Column(name) => {
-                // check that name exists
-                input_schema.field_with_name(&name)?;
-                Ok(Arc::new(Column::new(name)))
-            }
-            Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
-            Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
-                self.create_physical_expr(left, input_schema)?,
-                op.clone(),
-                self.create_physical_expr(right, input_schema)?,
-            ))),
-            Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
-                self.create_physical_expr(expr, input_schema)?,
-                input_schema,
-                data_type.clone(),
-            )?)),
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => match &self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .scalar_functions
-                .lock()
-                .expect("failed to lock mutex")
-                .get(name)
-            {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(e, input_schema)?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        Box::new(f.fun.clone()),
-                        physical_args,
-                        return_type,
-                    )))
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
-            other => Err(ExecutionError::NotImplemented(format!(
-                "Physical plan does not support logical expression {:?}",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_aggregate_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn AggregateExpr>> {
-        match e {
-            Expr::AggregateFunction { name, args, .. } => {
-                match name.to_lowercase().as_ref() {
-                    "sum" => Ok(Arc::new(Sum::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "avg" => Ok(Arc::new(Avg::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "max" => Ok(Arc::new(Max::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "min" => Ok(Arc::new(Min::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "count" => Ok(Arc::new(Count::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    other => Err(ExecutionError::NotImplemented(format!(
-                        "Unsupported aggregate function '{}'",
-                        other
-                    ))),
-                }
-            }
-            other => Err(ExecutionError::General(format!(
-                "Invalid aggregate expression '{:?}'",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_physical_sort_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-        options: SortOptions,
-    ) -> Result<PhysicalSortExpr> {
-        Ok(PhysicalSortExpr {
-            expr: self.create_physical_expr(e, input_schema)?,
-            options: options,
-        })
+        let planner: Arc<dyn PhysicalPlanner> = match self.config().physical_planner {
+            Some(planner) => planner,
+            None => Arc::new(PhysicalPlannerImpl::default()),
+        };
+        planner.create_physical_plan(logical_plan, self.state.clone())
     }
 
     /// Execute a physical plan and collect the results in memory
@@ -811,10 +442,14 @@ impl ExecutionContext {
 }
 
 /// Configuration options for execution context
-#[derive(Copy, Clone)]
+#[derive(Clone)]
 pub struct ExecutionConfig {
     /// Number of concurrent threads for query execution.
-    concurrency: usize,
+    pub concurrency: usize,
+    /// Default batch size when reading data sources
+    pub batch_size: usize,
+    /// Optional physical planner to override the default physical planner
+    physical_planner: Option<Arc<dyn PhysicalPlanner>>,
 }
 
 impl ExecutionConfig {
@@ -822,6 +457,8 @@ impl ExecutionConfig {
     pub fn new() -> Self {
         Self {
             concurrency: num_cpus::get(),
+            batch_size: 4096,
+            physical_planner: None,
         }
     }
 
@@ -832,14 +469,34 @@ impl ExecutionConfig {
         self.concurrency = n;
         self
     }
+
+    /// Customize batch size
+    pub fn with_batch_size(mut self, n: usize) -> Self {
+        // batch size must be greater than zero
+        assert!(n > 0);
+        self.batch_size = n;
+        self
+    }
+
+    /// Optional physical planner to override the default physical planner
+    pub fn with_physical_planner(
+        mut self,
+        physical_planner: Arc<dyn PhysicalPlanner>,
+    ) -> Self {
+        self.physical_planner = Some(physical_planner);
+        self
+    }
 }
 
 /// Execution context for registering data sources and executing queries
 #[derive(Clone)]
 pub struct ExecutionContextState {
-    datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
-    scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
-    config: ExecutionConfig,
+    /// Data sources that are registered with the context
+    pub datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>,
+    /// Scalar functions that are registered with the context
+    pub scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>,
+    /// Context configuration
+    pub config: ExecutionConfig,
 }
 
 impl SchemaProvider for ExecutionContextState {
@@ -911,7 +568,7 @@ mod tests {
             ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
         let logical_plan = ctx.optimize(&logical_plan)?;
 
-        let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
 
         let results = ctx.collect(physical_plan.as_ref())?;
 
@@ -955,7 +612,7 @@ mod tests {
         \n  TableScan: test projection=Some([1])";
         assert_eq!(format!("{:?}", optimized_plan), expected);
 
-        let physical_plan = ctx.create_physical_plan(&optimized_plan, 1024)?;
+        let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
 
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -990,7 +647,7 @@ mod tests {
             .build()?;
 
         let plan = ctx.optimize(&plan)?;
-        let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+        let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
         assert_eq!(
             physical_plan.schema().field_with_name("c1")?.is_nullable(),
             false
@@ -1043,7 +700,7 @@ mod tests {
         \n  InMemoryScan: projection=Some([1])";
         assert_eq!(format!("{:?}", optimized_plan), expected);
 
-        let physical_plan = ctx.create_physical_plan(&optimized_plan, 1024)?;
+        let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
 
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("b", physical_plan.schema().field(0).name().as_str());
@@ -1277,7 +934,7 @@ mod tests {
 
         let plan = ctx.optimize(&plan)?;
 
-        let physical_plan = ctx.create_physical_plan(&Arc::new(plan), 1024)?;
+        let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
         assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
         assert_eq!(
             "total_salary",
@@ -1418,7 +1075,7 @@ mod tests {
         );
 
         let plan = ctx.optimize(&plan)?;
-        let plan = ctx.create_physical_plan(&plan, 1024)?;
+        let plan = ctx.create_physical_plan(&plan)?;
         let result = ctx.collect(plan.as_ref())?;
 
         let batch = &result[0];
@@ -1452,11 +1109,34 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn custom_physical_planner() -> Result<()> {
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
+        );
+        ctx.sql("SELECT 1").expect_err("query not supported");
+        Ok(())
+    }
+
+    struct MyPhysicalPlanner {}
+
+    impl PhysicalPlanner for MyPhysicalPlanner {
+        fn create_physical_plan(
+            &self,
+            _logical_plan: &LogicalPlan,
+            _ctx_state: Arc<Mutex<ExecutionContextState>>,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            Err(ExecutionError::NotImplemented(
+                "query not supported".to_string(),
+            ))
+        }
+    }
+
     /// Execute SQL and return results
     fn collect(ctx: &mut ExecutionContext, sql: &str) -> Result<Vec<RecordBatch>> {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
         ctx.collect(physical_plan.as_ref())
     }
 
@@ -1480,7 +1160,7 @@ mod tests {
     fn write_csv(ctx: &mut ExecutionContext, sql: &str, out_dir: &str) -> Result<()> {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
         ctx.write_csv(physical_plan.as_ref(), out_dir)
     }
 
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs
index bc7afd5..491cb70 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -122,9 +122,9 @@ impl DataFrame for DataFrameImpl {
         self.plan.clone()
     }
 
-    fn collect(&self, batch_size: usize) -> Result<Vec<RecordBatch>> {
+    fn collect(&self) -> Result<Vec<RecordBatch>> {
         let mut ctx = ExecutionContext::from(self.ctx_state.clone());
-        ctx.collect_plan(&self.plan.clone(), batch_size)
+        ctx.collect_plan(&self.plan.clone())
     }
 
     /// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index de76916..4b66955 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -23,7 +23,8 @@ use std::rc::Rc;
 use std::sync::{Arc, Mutex};
 
 use crate::error::Result;
-use crate::logicalplan::ScalarValue;
+use crate::execution::context::ExecutionContextState;
+use crate::logicalplan::{LogicalPlan, ScalarValue};
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::{
@@ -32,6 +33,16 @@ use arrow::{
 };
 use udf::ScalarFunction;
 
+/// Physical query planner
+pub trait PhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        ctx_state: Arc<Mutex<ExecutionContextState>>,
+    ) -> Result<Arc<dyn ExecutionPlan>>;
+}
+
 /// Partition-aware execution plan for a relation
 pub trait ExecutionPlan: Debug {
     /// Get the schema for this execution plan
@@ -104,6 +115,7 @@ pub mod math_expressions;
 pub mod memory;
 pub mod merge;
 pub mod parquet;
+pub mod planner;
 pub mod projection;
 pub mod selection;
 pub mod sort;
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
new file mode 100644
index 0000000..7ce845b
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical query planner
+
+use std::sync::{Arc, Mutex};
+
+use crate::error::{ExecutionError, Result};
+use crate::execution::context::ExecutionContextState;
+use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
+use crate::execution::physical_plan::datasource::DatasourceExec;
+use crate::execution::physical_plan::explain::ExplainExec;
+use crate::execution::physical_plan::expressions::{
+    Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
+};
+use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
+use crate::execution::physical_plan::limit::GlobalLimitExec;
+use crate::execution::physical_plan::memory::MemoryExec;
+use crate::execution::physical_plan::merge::MergeExec;
+use crate::execution::physical_plan::parquet::ParquetExec;
+use crate::execution::physical_plan::projection::ProjectionExec;
+use crate::execution::physical_plan::selection::SelectionExec;
+use crate::execution::physical_plan::sort::SortExec;
+use crate::execution::physical_plan::udf::ScalarFunctionExpr;
+use crate::execution::physical_plan::{
+    AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner,
+};
+use crate::logicalplan::{Expr, LogicalPlan, PlanType, StringifiedPlan};
+use arrow::compute::SortOptions;
+use arrow::datatypes::Schema;
+
+/// Default physical query planner
+pub struct PhysicalPlannerImpl {}
+
+impl Default for PhysicalPlannerImpl {
+    /// Create an implementation of the physical planner
+    fn default() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalPlanner for PhysicalPlannerImpl {
+    /// Create a physical plan from a logical plan
+    fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        ctx_state: Arc<Mutex<ExecutionContextState>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let batch_size = ctx_state
+            .lock()
+            .expect("failed to lock mutex")
+            .config
+            .batch_size;
+
+        match logical_plan {
+            LogicalPlan::TableScan {
+                table_name,
+                projection,
+                ..
+            } => match ctx_state
+                .lock()
+                .expect("failed to lock mutex")
+                .datasources
+                .lock()
+                .expect("failed to lock mutex")
+                .get(table_name)
+            {
+                Some(provider) => {
+                    let partitions = provider.scan(projection, batch_size)?;
+                    if partitions.is_empty() {
+                        Err(ExecutionError::General(
+                            "Table provider returned no partitions".to_string(),
+                        ))
+                    } else {
+                        let schema = match projection {
+                            None => provider.schema().clone(),
+                            Some(p) => Arc::new(Schema::new(
+                                p.iter()
+                                    .map(|i| provider.schema().field(*i).clone())
+                                    .collect(),
+                            )),
+                        };
+
+                        let exec = DatasourceExec::new(schema, partitions.clone());
+                        Ok(Arc::new(exec))
+                    }
+                }
+                _ => Err(ExecutionError::General(format!(
+                    "No table named {}",
+                    table_name
+                ))),
+            },
+            LogicalPlan::InMemoryScan {
+                data,
+                projection,
+                projected_schema,
+                ..
+            } => Ok(Arc::new(MemoryExec::try_new(
+                data,
+                Arc::new(projected_schema.as_ref().to_owned()),
+                projection.to_owned(),
+            )?)),
+            LogicalPlan::CsvScan {
+                path,
+                schema,
+                has_header,
+                delimiter,
+                projection,
+                ..
+            } => Ok(Arc::new(CsvExec::try_new(
+                path,
+                CsvReadOptions::new()
+                    .schema(schema.as_ref())
+                    .delimiter_option(*delimiter)
+                    .has_header(*has_header),
+                projection.to_owned(),
+                batch_size,
+            )?)),
+            LogicalPlan::ParquetScan {
+                path, projection, ..
+            } => Ok(Arc::new(ParquetExec::try_new(
+                path,
+                projection.to_owned(),
+                batch_size,
+            )?)),
+            LogicalPlan::Projection { input, expr, .. } => {
+                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input_schema = input.as_ref().schema().clone();
+                let runtime_expr = expr
+                    .iter()
+                    .map(|e| {
+                        tuple_err((
+                            self.create_physical_expr(
+                                e,
+                                &input_schema,
+                                ctx_state.clone(),
+                            ),
+                            e.name(&input_schema),
+                        ))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
+            }
+            LogicalPlan::Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                ..
+            } => {
+                // Initially need to perform the aggregate and then merge the partitions
+                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input_schema = input.as_ref().schema().clone();
+
+                let groups = group_expr
+                    .iter()
+                    .map(|e| {
+                        tuple_err((
+                            self.create_physical_expr(
+                                e,
+                                &input_schema,
+                                ctx_state.clone(),
+                            ),
+                            e.name(&input_schema),
+                        ))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let aggregates = aggr_expr
+                    .iter()
+                    .map(|e| {
+                        tuple_err((
+                            self.create_aggregate_expr(
+                                e,
+                                &input_schema,
+                                ctx_state.clone(),
+                            ),
+                            e.name(&input_schema),
+                        ))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let initial_aggr = HashAggregateExec::try_new(
+                    groups.clone(),
+                    aggregates.clone(),
+                    input,
+                )?;
+
+                let schema = initial_aggr.schema();
+                let partitions = initial_aggr.partitions()?;
+
+                if partitions.len() == 1 {
+                    return Ok(Arc::new(initial_aggr));
+                }
+
+                let merge = Arc::new(MergeExec::new(
+                    schema.clone(),
+                    partitions,
+                    ctx_state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
+                ));
+
+                // construct the expressions for the final aggregation
+                let (final_group, final_aggr) = initial_aggr.make_final_expr(
+                    groups.iter().map(|x| x.1.clone()).collect(),
+                    aggregates.iter().map(|x| x.1.clone()).collect(),
+                );
+
+                // construct a second aggregation, keeping the final column name equal to the first aggregation
+                // and the expressions corresponding to the respective aggregate
+                Ok(Arc::new(HashAggregateExec::try_new(
+                    final_group
+                        .iter()
+                        .enumerate()
+                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+                        .collect(),
+                    final_aggr
+                        .iter()
+                        .enumerate()
+                        .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
+                        .collect(),
+                    merge,
+                )?))
+            }
+            LogicalPlan::Selection { input, expr, .. } => {
+                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input_schema = input.as_ref().schema().clone();
+                let runtime_expr =
+                    self.create_physical_expr(expr, &input_schema, ctx_state.clone())?;
+                Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
+            }
+            LogicalPlan::Sort { expr, input, .. } => {
+                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input_schema = input.as_ref().schema().clone();
+
+                let sort_expr = expr
+                    .iter()
+                    .map(|e| match e {
+                        Expr::Sort {
+                            expr,
+                            asc,
+                            nulls_first,
+                        } => self.create_physical_sort_expr(
+                            expr,
+                            &input_schema,
+                            SortOptions {
+                                descending: !*asc,
+                                nulls_first: *nulls_first,
+                            },
+                            ctx_state.clone(),
+                        ),
+                        _ => Err(ExecutionError::ExecutionError(
+                            "Sort only accepts sort expressions".to_string(),
+                        )),
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                Ok(Arc::new(SortExec::try_new(
+                    sort_expr,
+                    input,
+                    ctx_state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
+                )?))
+            }
+            LogicalPlan::Limit { input, n, .. } => {
+                let input = self.create_physical_plan(input, ctx_state.clone())?;
+                let input_schema = input.as_ref().schema().clone();
+
+                Ok(Arc::new(GlobalLimitExec::new(
+                    input_schema.clone(),
+                    input.partitions()?,
+                    *n,
+                    ctx_state
+                        .lock()
+                        .expect("failed to lock mutex")
+                        .config
+                        .concurrency,
+                )))
+            }
+            LogicalPlan::Explain {
+                verbose,
+                plan,
+                stringified_plans,
+                schema,
+            } => {
+                let input = self.create_physical_plan(plan, ctx_state.clone())?;
+
+                let mut stringified_plans = stringified_plans
+                    .iter()
+                    .filter(|s| s.should_display(*verbose))
+                    .map(|s| s.clone())
+                    .collect::<Vec<_>>();
+
+                // add in the physical plan if requested
+                if *verbose {
+                    stringified_plans.push(StringifiedPlan::new(
+                        PlanType::PhysicalPlan,
+                        format!("{:#?}", input),
+                    ));
+                }
+                let schema_ref = Arc::new((**schema).clone());
+                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
+            }
+            _ => Err(ExecutionError::General(
+                "Unsupported logical plan variant".to_string(),
+            )),
+        }
+    }
+}
+
+impl PhysicalPlannerImpl {
+    /// Create a physical expression from a logical expression
+    pub fn create_physical_expr(
+        &self,
+        e: &Expr,
+        input_schema: &Schema,
+        ctx_state: Arc<Mutex<ExecutionContextState>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        match e {
+            Expr::Alias(expr, ..) => {
+                Ok(self.create_physical_expr(expr, input_schema, ctx_state.clone())?)
+            }
+            Expr::Column(name) => {
+                // check that name exists
+                input_schema.field_with_name(&name)?;
+                Ok(Arc::new(Column::new(name)))
+            }
+            Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
+            Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
+                self.create_physical_expr(left, input_schema, ctx_state.clone())?,
+                op.clone(),
+                self.create_physical_expr(right, input_schema, ctx_state.clone())?,
+            ))),
+            Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
+                self.create_physical_expr(expr, input_schema, ctx_state.clone())?,
+                input_schema,
+                data_type.clone(),
+            )?)),
+            Expr::ScalarFunction {
+                name,
+                args,
+                return_type,
+            } => match ctx_state
+                .lock()
+                .expect("failed to lock mutex")
+                .scalar_functions
+                .lock()
+                .expect("failed to lock mutex")
+                .get(name)
+            {
+                Some(f) => {
+                    let mut physical_args = vec![];
+                    for e in args {
+                        physical_args.push(self.create_physical_expr(
+                            e,
+                            input_schema,
+                            ctx_state.clone(),
+                        )?);
+                    }
+                    Ok(Arc::new(ScalarFunctionExpr::new(
+                        name,
+                        Box::new(f.fun.clone()),
+                        physical_args,
+                        return_type,
+                    )))
+                }
+                _ => Err(ExecutionError::General(format!(
+                    "Invalid scalar function '{:?}'",
+                    name
+                ))),
+            },
+            other => Err(ExecutionError::NotImplemented(format!(
+                "Physical plan does not support logical expression {:?}",
+                other
+            ))),
+        }
+    }
+
+    /// Create an aggregate expression from a logical expression
+    pub fn create_aggregate_expr(
+        &self,
+        e: &Expr,
+        input_schema: &Schema,
+        ctx_state: Arc<Mutex<ExecutionContextState>>,
+    ) -> Result<Arc<dyn AggregateExpr>> {
+        match e {
+            Expr::AggregateFunction { name, args, .. } => {
+                match name.to_lowercase().as_ref() {
+                    "sum" => Ok(Arc::new(Sum::new(self.create_physical_expr(
+                        &args[0],
+                        input_schema,
+                        ctx_state.clone(),
+                    )?))),
+                    "avg" => Ok(Arc::new(Avg::new(self.create_physical_expr(
+                        &args[0],
+                        input_schema,
+                        ctx_state.clone(),
+                    )?))),
+                    "max" => Ok(Arc::new(Max::new(self.create_physical_expr(
+                        &args[0],
+                        input_schema,
+                        ctx_state.clone(),
+                    )?))),
+                    "min" => Ok(Arc::new(Min::new(self.create_physical_expr(
+                        &args[0],
+                        input_schema,
+                        ctx_state.clone(),
+                    )?))),
+                    "count" => Ok(Arc::new(Count::new(self.create_physical_expr(
+                        &args[0],
+                        input_schema,
+                        ctx_state.clone(),
+                    )?))),
+                    other => Err(ExecutionError::NotImplemented(format!(
+                        "Unsupported aggregate function '{}'",
+                        other
+                    ))),
+                }
+            }
+            other => Err(ExecutionError::General(format!(
+                "Invalid aggregate expression '{:?}'",
+                other
+            ))),
+        }
+    }
+
+    /// Create an aggregate expression from a logical expression
+    pub fn create_physical_sort_expr(
+        &self,
+        e: &Expr,
+        input_schema: &Schema,
+        options: SortOptions,
+        ctx_state: Arc<Mutex<ExecutionContextState>>,
+    ) -> Result<PhysicalSortExpr> {
+        Ok(PhysicalSortExpr {
+            expr: self.create_physical_expr(e, input_schema, ctx_state.clone())?,
+            options: options,
+        })
+    }
+}
+
+fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
+    match value {
+        (Ok(e), Ok(e1)) => Ok((e, e1)),
+        (Err(e), Ok(_)) => Err(e),
+        (Ok(_), Err(e1)) => Err(e1),
+        (Err(e), Err(_)) => Err(e),
+    }
+}
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index a258d3d..d2d5349 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -31,8 +31,6 @@ use datafusion::execution::context::ExecutionContext;
 use datafusion::execution::physical_plan::udf::ScalarFunction;
 use datafusion::logicalplan::LogicalPlan;
 
-const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
-
 #[test]
 fn nyc() -> Result<()> {
     // schema for nyxtaxi csv files
@@ -111,7 +109,7 @@ fn parquet_single_nan_schema() {
     let sql = "SELECT mycol FROM single_nan";
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
     let results = ctx.collect(plan.as_ref()).unwrap();
     for batch in results {
         assert_eq!(1, batch.num_rows());
@@ -277,7 +275,7 @@ fn csv_query_avg_multi_batch() -> Result<()> {
     let sql = "SELECT avg(c12) FROM aggregate_test_100";
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
     let results = ctx.collect(plan.as_ref()).unwrap();
     let batch = &results[0];
     let column = batch.column(0);
@@ -489,9 +487,8 @@ fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
 
     // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
     // unsigned is supported.
-    ctx.sql(
-        &format!(
-            "
+    ctx.sql(&format!(
+        "
     CREATE EXTERNAL TABLE aggregate_test_100 (
         c1  VARCHAR NOT NULL,
         c2  INT NOT NULL,
@@ -511,10 +508,8 @@ fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
     WITH HEADER ROW
     LOCATION '{}/csv/aggregate_test_100.csv'
     ",
-            testdata
-        ),
-        1024,
-    )
+        testdata
+    ))
     .unwrap();
 }
 
@@ -542,7 +537,7 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
 fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<String> {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
+    let plan = ctx.create_physical_plan(&plan).unwrap();
     let results = ctx.collect(plan.as_ref()).unwrap();
     result_str(&results)
 }