You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/21 09:57:34 UTC

[arrow-datafusion] branch master updated: Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async (#1013)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 299ab7d  Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async (#1013)
299ab7d is described below

commit 299ab7d1c37c707fcd500d3428abbdbe4dc5399b
Author: rdettai <rd...@gmail.com>
AuthorDate: Tue Sep 21 11:57:27 2021 +0200

    Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async (#1013)
    
    * [feat] make TableProvider.scan() async
    
    * [fix] python bindings
    
    * [fix] phantom files
    
    * [fix] removed parallelization
    
    * [fix] remove async from sql after rebase
---
 ballista/rust/core/src/utils.rs              |    4 +-
 ballista/rust/scheduler/src/lib.rs           |    2 +
 ballista/rust/scheduler/src/planner.rs       |  205 +++---
 benchmarks/src/bin/nyctaxi.rs                |    2 +-
 benchmarks/src/bin/tpch.rs                   |   14 +-
 datafusion/src/datasource/avro.rs            |    8 +-
 datafusion/src/datasource/csv.rs             |    4 +-
 datafusion/src/datasource/datasource.rs      |    5 +-
 datafusion/src/datasource/empty.rs           |    4 +-
 datafusion/src/datasource/json.rs            |    5 +-
 datafusion/src/datasource/memory.rs          |   18 +-
 datafusion/src/datasource/parquet.rs         |    8 +-
 datafusion/src/execution/context.rs          |   79 +-
 datafusion/src/execution/dataframe_impl.rs   |    2 +-
 datafusion/src/optimizer/filter_push_down.rs |    4 +-
 datafusion/src/physical_plan/mod.rs          |   45 +-
 datafusion/src/physical_plan/planner.rs      | 1025 +++++++++++++-------------
 datafusion/tests/custom_sources.rs           |   10 +-
 datafusion/tests/parquet_pruning.rs          |    1 +
 datafusion/tests/provider_filter_pushdown.rs |    3 +-
 datafusion/tests/sql.rs                      |   26 +-
 datafusion/tests/statistics.rs               |   28 +-
 datafusion/tests/user_defined_plan.rs        |    7 +-
 python/src/dataframe.rs                      |   31 +-
 24 files changed, 822 insertions(+), 718 deletions(-)

diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index c731e60..fd12eb9 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
 use crate::config::BallistaConfig;
+use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::{
@@ -269,8 +270,9 @@ impl BallistaQueryPlanner {
     }
 }
 
+#[async_trait]
 impl QueryPlanner for BallistaQueryPlanner {
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         _ctx_state: &ExecutionContextState,
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index f03d08b..47caf4c 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -418,6 +418,7 @@ impl SchedulerGrpc for SchedulerServer {
 
                 let plan = fail_job!(datafusion_ctx
                     .create_physical_plan(&optimized_plan)
+                    .await
                     .map_err(|e| {
                         let msg = format!("Could not create physical plan: {}", e);
                         error!("{}", msg);
@@ -447,6 +448,7 @@ impl SchedulerGrpc for SchedulerServer {
                 let mut planner = DistributedPlanner::new();
                 let stages = fail_job!(planner
                     .plan_query_stages(&job_id_spawn, plan)
+                    .await
                     .map_err(|e| {
                         let msg = format!("Could not plan query stages: {}", e);
                         error!("{}", msg);
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 2872ff9..3d5712a 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -31,6 +31,8 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::windows::WindowAggExec;
 use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use log::info;
 
 type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -55,14 +57,15 @@ impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
     /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
-    pub fn plan_query_stages(
-        &mut self,
-        job_id: &str,
+    pub async fn plan_query_stages<'a>(
+        &'a mut self,
+        job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
         info!("planning query stages");
-        let (new_plan, mut stages) =
-            self.plan_query_stages_internal(job_id, execution_plan)?;
+        let (new_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan)
+            .await?;
         stages.push(create_shuffle_writer(
             job_id,
             self.next_stage_id(),
@@ -75,91 +78,95 @@ impl DistributedPlanner {
     /// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
     /// This function is needed because the input execution_plan might need to be modified, but it might not hold a
     /// complete query stage (its parent might also belong to the same stage)
-    fn plan_query_stages_internal(
-        &mut self,
-        job_id: &str,
+    fn plan_query_stages_internal<'a>(
+        &'a mut self,
+        job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<PartialQueryStageResult> {
-        // recurse down and replace children
-        if execution_plan.children().is_empty() {
-            return Ok((execution_plan, vec![]));
-        }
+    ) -> BoxFuture<'a, Result<PartialQueryStageResult>> {
+        async move {
+            // recurse down and replace children
+            if execution_plan.children().is_empty() {
+                return Ok((execution_plan, vec![]));
+            }
 
-        let mut stages = vec![];
-        let mut children = vec![];
-        for child in execution_plan.children() {
-            let (new_child, mut child_stages) =
-                self.plan_query_stages_internal(job_id, child.clone())?;
-            children.push(new_child);
-            stages.append(&mut child_stages);
-        }
+            let mut stages = vec![];
+            let mut children = vec![];
+            for child in execution_plan.children() {
+                let (new_child, mut child_stages) = self
+                    .plan_query_stages_internal(job_id, child.clone())
+                    .await?;
+                children.push(new_child);
+                stages.append(&mut child_stages);
+            }
 
-        if let Some(coalesce) = execution_plan
-            .as_any()
-            .downcast_ref::<CoalescePartitionsExec>()
-        {
-            let shuffle_writer = create_shuffle_writer(
-                job_id,
-                self.next_stage_id(),
-                children[0].clone(),
-                None,
-            )?;
-            let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                shuffle_writer.stage_id(),
-                shuffle_writer.schema(),
-                shuffle_writer.output_partitioning().partition_count(),
-                shuffle_writer
-                    .shuffle_output_partitioning()
-                    .map(|p| p.partition_count())
-                    .unwrap_or_else(|| {
-                        shuffle_writer.output_partitioning().partition_count()
-                    }),
-            ));
-            stages.push(shuffle_writer);
-            Ok((
-                coalesce.with_new_children(vec![unresolved_shuffle])?,
-                stages,
-            ))
-        } else if let Some(repart) =
-            execution_plan.as_any().downcast_ref::<RepartitionExec>()
-        {
-            match repart.output_partitioning() {
-                Partitioning::Hash(_, _) => {
-                    let shuffle_writer = create_shuffle_writer(
-                        job_id,
-                        self.next_stage_id(),
-                        children[0].clone(),
-                        Some(repart.partitioning().to_owned()),
-                    )?;
-                    let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                        shuffle_writer.stage_id(),
-                        shuffle_writer.schema(),
-                        shuffle_writer.output_partitioning().partition_count(),
-                        shuffle_writer
-                            .shuffle_output_partitioning()
-                            .map(|p| p.partition_count())
-                            .unwrap_or_else(|| {
-                                shuffle_writer.output_partitioning().partition_count()
-                            }),
-                    ));
-                    stages.push(shuffle_writer);
-                    Ok((unresolved_shuffle, stages))
-                }
-                _ => {
-                    // remove any non-hash repartition from the distributed plan
-                    Ok((children[0].clone(), stages))
+            if let Some(coalesce) = execution_plan
+                .as_any()
+                .downcast_ref::<CoalescePartitionsExec>()
+            {
+                let shuffle_writer = create_shuffle_writer(
+                    job_id,
+                    self.next_stage_id(),
+                    children[0].clone(),
+                    None,
+                )?;
+                let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+                    shuffle_writer.stage_id(),
+                    shuffle_writer.schema(),
+                    shuffle_writer.output_partitioning().partition_count(),
+                    shuffle_writer
+                        .shuffle_output_partitioning()
+                        .map(|p| p.partition_count())
+                        .unwrap_or_else(|| {
+                            shuffle_writer.output_partitioning().partition_count()
+                        }),
+                ));
+                stages.push(shuffle_writer);
+                Ok((
+                    coalesce.with_new_children(vec![unresolved_shuffle])?,
+                    stages,
+                ))
+            } else if let Some(repart) =
+                execution_plan.as_any().downcast_ref::<RepartitionExec>()
+            {
+                match repart.output_partitioning() {
+                    Partitioning::Hash(_, _) => {
+                        let shuffle_writer = create_shuffle_writer(
+                            job_id,
+                            self.next_stage_id(),
+                            children[0].clone(),
+                            Some(repart.partitioning().to_owned()),
+                        )?;
+                        let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+                            shuffle_writer.stage_id(),
+                            shuffle_writer.schema(),
+                            shuffle_writer.output_partitioning().partition_count(),
+                            shuffle_writer
+                                .shuffle_output_partitioning()
+                                .map(|p| p.partition_count())
+                                .unwrap_or_else(|| {
+                                    shuffle_writer.output_partitioning().partition_count()
+                                }),
+                        ));
+                        stages.push(shuffle_writer);
+                        Ok((unresolved_shuffle, stages))
+                    }
+                    _ => {
+                        // remove any non-hash repartition from the distributed plan
+                        Ok((children[0].clone(), stages))
+                    }
                 }
+            } else if let Some(window) =
+                execution_plan.as_any().downcast_ref::<WindowAggExec>()
+            {
+                Err(BallistaError::NotImplemented(format!(
+                    "WindowAggExec with window {:?}",
+                    window
+                )))
+            } else {
+                Ok((execution_plan.with_new_children(children)?, stages))
             }
-        } else if let Some(window) =
-            execution_plan.as_any().downcast_ref::<WindowAggExec>()
-        {
-            Err(BallistaError::NotImplemented(format!(
-                "WindowAggExec with window {:?}",
-                window
-            )))
-        } else {
-            Ok((execution_plan.with_new_children(children)?, stages))
         }
+        .boxed()
     }
 
     /// Generate a new stage ID
@@ -262,8 +269,8 @@ mod test {
         };
     }
 
-    #[test]
-    fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
+    #[tokio::test]
+    async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
         let mut ctx = datafusion_test_context("testdata")?;
 
         // simplified form of TPC-H query 1
@@ -276,11 +283,13 @@ mod test {
 
         let plan = df.to_logical_plan();
         let plan = ctx.optimize(&plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = ctx.create_physical_plan(&plan).await?;
 
         let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
-        let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+        let stages = planner
+            .plan_query_stages(&job_uuid.to_string(), plan)
+            .await?;
         for stage in &stages {
             println!("{}", displayable(stage.as_ref()).indent().to_string());
         }
@@ -345,8 +354,8 @@ mod test {
         Ok(())
     }
 
-    #[test]
-    fn distributed_join_plan() -> Result<(), BallistaError> {
+    #[tokio::test]
+    async fn distributed_join_plan() -> Result<(), BallistaError> {
         let mut ctx = datafusion_test_context("testdata")?;
 
         // simplified form of TPC-H query 12
@@ -386,11 +395,13 @@ order by
 
         let plan = df.to_logical_plan();
         let plan = ctx.optimize(&plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = ctx.create_physical_plan(&plan).await?;
 
         let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
-        let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+        let stages = planner
+            .plan_query_stages(&job_uuid.to_string(), plan)
+            .await?;
         for stage in &stages {
             println!("{}", displayable(stage.as_ref()).indent().to_string());
         }
@@ -516,8 +527,8 @@ order by
         Ok(())
     }
 
-    #[test]
-    fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+    #[tokio::test]
+    async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
         let mut ctx = datafusion_test_context("testdata")?;
 
         // simplified form of TPC-H query 1
@@ -530,11 +541,13 @@ order by
 
         let plan = df.to_logical_plan();
         let plan = ctx.optimize(&plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = ctx.create_physical_plan(&plan).await?;
 
         let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
-        let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
+        let stages = planner
+            .plan_query_stages(&job_uuid.to_string(), plan)
+            .await?;
 
         let partial_hash = stages[0].children()[0].clone();
         let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index 9387530..a88494f 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -121,7 +121,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
     if debug {
         println!("Optimized logical plan:\n{:?}", plan);
     }
-    let physical_plan = ctx.create_physical_plan(&plan)?;
+    let physical_plan = ctx.create_physical_plan(&plan).await?;
     let result = collect(physical_plan).await?;
     if debug {
         pretty::print_batches(&result)?;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 852e499..2e9b2ff 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -348,7 +348,7 @@ async fn execute_query(
     if debug {
         println!("=== Optimized logical plan ===\n{:?}\n", plan);
     }
-    let physical_plan = ctx.create_physical_plan(&plan)?;
+    let physical_plan = ctx.create_physical_plan(&plan).await?;
     if debug {
         println!(
             "=== Physical plan ===\n{}\n",
@@ -394,7 +394,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
         // create the physical plan
         let csv = csv.to_logical_plan();
         let csv = ctx.optimize(&csv)?;
-        let csv = ctx.create_physical_plan(&csv)?;
+        let csv = ctx.create_physical_plan(&csv).await?;
 
         let output_path = output_root_path.join(table);
         let output_path = output_path.to_str().unwrap().to_owned();
@@ -1063,7 +1063,7 @@ mod tests {
         use datafusion::physical_plan::ExecutionPlan;
         use std::convert::TryInto;
 
-        fn round_trip_query(n: usize) -> Result<()> {
+        async fn round_trip_query(n: usize) -> Result<()> {
             let config = ExecutionConfig::new()
                 .with_target_partitions(1)
                 .with_batch_size(10);
@@ -1110,7 +1110,7 @@ mod tests {
 
             // test physical plan roundtrip
             if env::var("TPCH_DATA").is_ok() {
-                let physical_plan = ctx.create_physical_plan(&plan)?;
+                let physical_plan = ctx.create_physical_plan(&plan).await?;
                 let proto: protobuf::PhysicalPlanNode =
                     (physical_plan.clone()).try_into().unwrap();
                 let round_trip: Arc<dyn ExecutionPlan> = (&proto).try_into().unwrap();
@@ -1126,9 +1126,9 @@ mod tests {
 
         macro_rules! test_round_trip {
             ($tn:ident, $query:expr) => {
-                #[test]
-                fn $tn() -> Result<()> {
-                    round_trip_query($query)
+                #[tokio::test]
+                async fn $tn() -> Result<()> {
+                    round_trip_query($query).await
                 }
             };
         }
diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs
index ee0fabf..ee5cea5 100644
--- a/datafusion/src/datasource/avro.rs
+++ b/datafusion/src/datasource/avro.rs
@@ -27,6 +27,7 @@ use std::{
 };
 
 use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
 
 use crate::physical_plan::avro::{AvroExec, AvroReadOptions};
 use crate::{
@@ -120,6 +121,7 @@ impl AvroFile {
     }
 }
 
+#[async_trait]
 impl TableProvider for AvroFile {
     fn as_any(&self) -> &dyn Any {
         self
@@ -129,7 +131,7 @@ impl TableProvider for AvroFile {
         self.schema.clone()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
@@ -185,7 +187,7 @@ mod tests {
     async fn read_small_batches() -> Result<()> {
         let table = load_table("alltypes_plain.avro")?;
         let projection = None;
-        let exec = table.scan(&projection, 2, &[], None)?;
+        let exec = table.scan(&projection, 2, &[], None).await?;
         let stream = exec.execute(0).await?;
 
         let _ = stream
@@ -414,7 +416,7 @@ mod tests {
         table: Arc<dyn TableProvider>,
         projection: &Option<Vec<usize>>,
     ) -> Result<RecordBatch> {
-        let exec = table.scan(projection, 1024, &[], None)?;
+        let exec = table.scan(projection, 1024, &[], None).await?;
         let mut it = exec.execute(0).await?;
         it.next()
             .await
diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 971bd91..d47312e 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -34,6 +34,7 @@
 //! ```
 
 use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
 use std::any::Any;
 use std::io::{Read, Seek};
 use std::string::String;
@@ -157,6 +158,7 @@ impl CsvFile {
     }
 }
 
+#[async_trait]
 impl TableProvider for CsvFile {
     fn as_any(&self) -> &dyn Any {
         self
@@ -166,7 +168,7 @@ impl TableProvider for CsvFile {
         self.schema.clone()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs
index 3c60255..918200f 100644
--- a/datafusion/src/datasource/datasource.rs
+++ b/datafusion/src/datasource/datasource.rs
@@ -20,6 +20,8 @@
 use std::any::Any;
 use std::sync::Arc;
 
+use async_trait::async_trait;
+
 use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -54,6 +56,7 @@ pub enum TableType {
 }
 
 /// Source table
+#[async_trait]
 pub trait TableProvider: Sync + Send {
     /// Returns the table provider as [`Any`](std::any::Any) so that it can be
     /// downcast to a specific implementation.
@@ -68,7 +71,7 @@ pub trait TableProvider: Sync + Send {
     }
 
     /// Create an ExecutionPlan that will scan the table.
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
diff --git a/datafusion/src/datasource/empty.rs b/datafusion/src/datasource/empty.rs
index 183db76..380c5a7 100644
--- a/datafusion/src/datasource/empty.rs
+++ b/datafusion/src/datasource/empty.rs
@@ -21,6 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::*;
+use async_trait::async_trait;
 
 use crate::datasource::TableProvider;
 use crate::error::Result;
@@ -39,6 +40,7 @@ impl EmptyTable {
     }
 }
 
+#[async_trait]
 impl TableProvider for EmptyTable {
     fn as_any(&self) -> &dyn Any {
         self
@@ -48,7 +50,7 @@ impl TableProvider for EmptyTable {
         self.schema.clone()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs
index f4e6782..1a6ec7a 100644
--- a/datafusion/src/datasource/json.rs
+++ b/datafusion/src/datasource/json.rs
@@ -36,6 +36,7 @@ use crate::{
     },
 };
 use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
+use async_trait::async_trait;
 
 trait SeekRead: Read + Seek {}
 
@@ -101,6 +102,8 @@ impl NdJsonFile {
         })
     }
 }
+
+#[async_trait]
 impl TableProvider for NdJsonFile {
     fn as_any(&self) -> &dyn Any {
         self
@@ -110,7 +113,7 @@ impl TableProvider for NdJsonFile {
         self.schema.clone()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs
index 67b0e7b..b47e7e1 100644
--- a/datafusion/src/datasource/memory.rs
+++ b/datafusion/src/datasource/memory.rs
@@ -25,6 +25,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
 
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
@@ -66,7 +67,7 @@ impl MemTable {
         output_partitions: Option<usize>,
     ) -> Result<Self> {
         let schema = t.schema();
-        let exec = t.scan(&None, batch_size, &[], None)?;
+        let exec = t.scan(&None, batch_size, &[], None).await?;
         let partition_count = exec.output_partitioning().partition_count();
 
         let tasks = (0..partition_count)
@@ -114,6 +115,7 @@ impl MemTable {
     }
 }
 
+#[async_trait]
 impl TableProvider for MemTable {
     fn as_any(&self) -> &dyn Any {
         self
@@ -123,7 +125,7 @@ impl TableProvider for MemTable {
         self.schema.clone()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
@@ -168,7 +170,7 @@ mod tests {
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
         // scan with projection
-        let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None)?;
+        let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None).await?;
         let mut it = exec.execute(0).await?;
         let batch2 = it.next().await.unwrap()?;
         assert_eq!(2, batch2.schema().fields().len());
@@ -198,7 +200,7 @@ mod tests {
 
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
-        let exec = provider.scan(&None, 1024, &[], None)?;
+        let exec = provider.scan(&None, 1024, &[], None).await?;
         let mut it = exec.execute(0).await?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
@@ -207,8 +209,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_invalid_projection() -> Result<()> {
+    #[tokio::test]
+    async fn test_invalid_projection() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, false),
@@ -228,7 +230,7 @@ mod tests {
 
         let projection: Vec<usize> = vec![0, 4];
 
-        match provider.scan(&Some(projection), 1024, &[], None) {
+        match provider.scan(&Some(projection), 1024, &[], None).await {
             Err(DataFusionError::Internal(e)) => {
                 assert_eq!("\"Projection index out of range\"", format!("{:?}", e))
             }
@@ -349,7 +351,7 @@ mod tests {
         let provider =
             MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
 
-        let exec = provider.scan(&None, 1024, &[], None)?;
+        let exec = provider.scan(&None, 1024, &[], None).await?;
         let mut it = exec.execute(0).await?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index 8dc9bc5..65c9089 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -21,6 +21,7 @@ use std::any::Any;
 use std::fs::File;
 use std::sync::Arc;
 
+use async_trait::async_trait;
 use parquet::arrow::ArrowReader;
 use parquet::arrow::ParquetFileArrowReader;
 use parquet::file::serialized_reader::SerializedFileReader;
@@ -110,6 +111,7 @@ impl ParquetTable {
     }
 }
 
+#[async_trait]
 impl TableProvider for ParquetTable {
     fn as_any(&self) -> &dyn Any {
         self
@@ -129,7 +131,7 @@ impl TableProvider for ParquetTable {
 
     /// Scan the file(s), using the provided projection, and return one BatchIterator per
     /// partition.
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         batch_size: usize,
@@ -414,7 +416,7 @@ mod tests {
     async fn read_small_batches() -> Result<()> {
         let table = load_table("alltypes_plain.parquet")?;
         let projection = None;
-        let exec = table.scan(&projection, 2, &[], None)?;
+        let exec = table.scan(&projection, 2, &[], None).await?;
         let stream = exec.execute(0).await?;
 
         let _ = stream
@@ -635,7 +637,7 @@ mod tests {
         table: Arc<dyn TableProvider>,
         projection: &Option<Vec<usize>>,
     ) -> Result<RecordBatch> {
-        let exec = table.scan(projection, 1024, &[], None)?;
+        let exec = table.scan(projection, 1024, &[], None).await?;
         let mut it = exec.execute(0).await?;
         it.next()
             .await
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 6789b79..00adbb0 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -81,6 +81,7 @@ use crate::sql::{
 };
 use crate::variable::{VarProvider, VarType};
 use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
+use async_trait::async_trait;
 use chrono::{DateTime, Utc};
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
@@ -533,17 +534,27 @@ impl ExecutionContext {
     }
 
     /// Creates a physical plan from a logical plan.
-    pub fn create_physical_plan(
+    pub async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut state = self.state.lock().unwrap();
-        state.execution_props.start_execution();
+        let (state, planner) = {
+            let mut state = self.state.lock().unwrap();
+            state.execution_props.start_execution();
+
+            // We need to clone `state` to release the lock that is not `Send`. We could
+            // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
+            // propagate async even to the `LogicalPlan` building methods.
+            // Cloning `state` here is fine as we then pass it as immutable `&state`, which
+            // means that we avoid write consistency issues as the cloned version will not
+            // be written to. As for eventual modifications that would be applied to the
+            // original state after it has been cloned, they will not be picked up by the
+            // clone but that is okay, as it is equivalent to postponing the state update
+            // by keeping the lock until the end of the function scope.
+            (state.clone(), Arc::clone(&state.config.query_planner))
+        };
 
-        state
-            .config
-            .query_planner
-            .create_physical_plan(logical_plan, &state)
+        planner.create_physical_plan(logical_plan, &state).await
     }
 
     /// Executes a query and writes the results to a partitioned CSV file.
@@ -676,9 +687,10 @@ impl FunctionRegistry for ExecutionContext {
 }
 
 /// A planner used to add extensions to DataFusion logical and physical plans.
+#[async_trait]
 pub trait QueryPlanner {
     /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
@@ -688,15 +700,16 @@ pub trait QueryPlanner {
 /// The query planner used if no user defined planner is provided
 struct DefaultQueryPlanner {}
 
+#[async_trait]
 impl QueryPlanner for DefaultQueryPlanner {
     /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let planner = DefaultPhysicalPlanner::default();
-        planner.create_physical_plan(logical_plan, ctx_state)
+        planner.create_physical_plan(logical_plan, ctx_state).await
     }
 }
 
@@ -1054,6 +1067,7 @@ mod tests {
     use arrow::compute::add;
     use arrow::datatypes::*;
     use arrow::record_batch::RecordBatch;
+    use async_trait::async_trait;
     use std::fs::File;
     use std::sync::Weak;
     use std::thread::{self, JoinHandle};
@@ -1214,7 +1228,7 @@ mod tests {
             ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
         let logical_plan = ctx.optimize(&logical_plan)?;
 
-        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
 
         let results = collect_partitioned(physical_plan).await?;
 
@@ -1290,7 +1304,7 @@ mod tests {
         \n  TableScan: test projection=Some([1])";
         assert_eq!(format!("{:?}", optimized_plan), expected);
 
-        let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+        let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
 
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -1301,8 +1315,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn preserve_nullability_on_projection() -> Result<()> {
+    #[tokio::test]
+    async fn preserve_nullability_on_projection() -> Result<()> {
         let tmp_dir = TempDir::new()?;
         let ctx = create_ctx(&tmp_dir, 1)?;
 
@@ -1314,7 +1328,7 @@ mod tests {
             .build()?;
 
         let plan = ctx.optimize(&plan)?;
-        let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
+        let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
         assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
         Ok(())
     }
@@ -1366,7 +1380,7 @@ mod tests {
         );
         assert_eq!(format!("{:?}", optimized_plan), expected);
 
-        let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+        let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
 
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("b", physical_plan.schema().field(0).name().as_str());
@@ -2442,8 +2456,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn aggregate_with_alias() -> Result<()> {
+    #[tokio::test]
+    async fn aggregate_with_alias() -> Result<()> {
         let tmp_dir = TempDir::new()?;
         let ctx = create_ctx(&tmp_dir, 1)?;
 
@@ -2459,7 +2473,7 @@ mod tests {
 
         let plan = ctx.optimize(&plan)?;
 
-        let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?;
+        let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
         assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
         assert_eq!(
             "total_salary",
@@ -2873,8 +2887,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn send_context_to_threads() -> Result<()> {
+    #[tokio::test]
+    async fn send_context_to_threads() -> Result<()> {
         // ensure ExecutionContexts can be used in a multi-threaded
         // environment. Usecase is for concurrent planing.
         let tmp_dir = TempDir::new()?;
@@ -2900,8 +2914,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn ctx_sql_should_optimize_plan() -> Result<()> {
+    #[tokio::test]
+    async fn ctx_sql_should_optimize_plan() -> Result<()> {
         let mut ctx = ExecutionContext::new();
         let plan1 =
             ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
@@ -2977,7 +2991,7 @@ mod tests {
         );
 
         let plan = ctx.optimize(&plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = ctx.create_physical_plan(&plan).await?;
         let result = collect(plan).await?;
 
         let expected = vec![
@@ -3247,6 +3261,7 @@ mod tests {
     async fn information_schema_tables_table_types() {
         struct TestTable(TableType);
 
+        #[async_trait]
         impl TableProvider for TestTable {
             fn as_any(&self) -> &dyn std::any::Any {
                 self
@@ -3260,7 +3275,7 @@ mod tests {
                 unimplemented!()
             }
 
-            fn scan(
+            async fn scan(
                 &self,
                 _: &Option<Vec<usize>>,
                 _: usize,
@@ -3764,8 +3779,9 @@ mod tests {
 
     struct MyPhysicalPlanner {}
 
+    #[async_trait]
     impl PhysicalPlanner for MyPhysicalPlanner {
-        fn create_physical_plan(
+        async fn create_physical_plan(
             &self,
             _logical_plan: &LogicalPlan,
             _ctx_state: &ExecutionContextState,
@@ -3788,14 +3804,17 @@ mod tests {
 
     struct MyQueryPlanner {}
 
+    #[async_trait]
     impl QueryPlanner for MyQueryPlanner {
-        fn create_physical_plan(
+        async fn create_physical_plan(
             &self,
             logical_plan: &LogicalPlan,
             ctx_state: &ExecutionContextState,
         ) -> Result<Arc<dyn ExecutionPlan>> {
             let physical_planner = MyPhysicalPlanner {};
-            physical_planner.create_physical_plan(logical_plan, ctx_state)
+            physical_planner
+                .create_physical_plan(logical_plan, ctx_state)
+                .await
         }
     }
 
@@ -3822,7 +3841,7 @@ mod tests {
     ) -> Result<()> {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
         ctx.write_csv(physical_plan, out_dir.to_string()).await
     }
 
@@ -3835,7 +3854,7 @@ mod tests {
     ) -> Result<()> {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan)?;
+        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
         ctx.write_parquet(physical_plan, out_dir.to_string(), writer_properties)
             .await
     }
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index 724a3f8..cd39dd6 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -57,7 +57,7 @@ impl DataFrameImpl {
         let state = self.ctx_state.lock().unwrap().clone();
         let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
         let plan = ctx.optimize(&self.plan)?;
-        ctx.create_physical_plan(&plan)
+        ctx.create_physical_plan(&plan).await
     }
 }
 
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index a51fbc2..594e371 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -549,6 +549,7 @@ mod tests {
     use crate::test::*;
     use crate::{logical_plan::col, prelude::JoinType};
     use arrow::datatypes::SchemaRef;
+    use async_trait::async_trait;
 
     fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
         let rule = FilterPushDown::new();
@@ -1129,6 +1130,7 @@ mod tests {
         pub filter_support: TableProviderFilterPushDown,
     }
 
+    #[async_trait]
     impl TableProvider for PushDownProvider {
         fn schema(&self) -> SchemaRef {
             Arc::new(arrow::datatypes::Schema::new(vec![
@@ -1140,7 +1142,7 @@ mod tests {
             ]))
         }
 
-        fn scan(
+        async fn scan(
             &self,
             _: &Option<Vec<usize>>,
             _: usize,
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index d12b217..f0b5622 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -193,31 +193,34 @@ pub trait ExecutionPlan: Debug + Send + Sync {
 /// use datafusion::prelude::*;
 /// use datafusion::physical_plan::displayable;
 ///
-/// // Hard code target_partitions as it appears in the RepartitionExec output
-/// let config = ExecutionConfig::new()
-///     .with_target_partitions(3);
-/// let mut ctx = ExecutionContext::with_config(config);
+/// #[tokio::main]
+/// async fn main() {
+///   // Hard code target_partitions as it appears in the RepartitionExec output
+///   let config = ExecutionConfig::new()
+///       .with_target_partitions(3);
+///   let mut ctx = ExecutionContext::with_config(config);
 ///
-/// // register the a table
-/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
+///   // register the a table
+///   ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap();
 ///
-/// // create a plan to run a SQL query
-/// let plan = ctx
-///    .create_logical_plan("SELECT a FROM example WHERE a < 5")
-///    .unwrap();
-/// let plan = ctx.optimize(&plan).unwrap();
-/// let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+///   // create a plan to run a SQL query
+///   let plan = ctx
+///      .create_logical_plan("SELECT a FROM example WHERE a < 5")
+///      .unwrap();
+///   let plan = ctx.optimize(&plan).unwrap();
+///   let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
 ///
-/// // Format using display string
-/// let displayable_plan = displayable(physical_plan.as_ref());
-/// let plan_string = format!("{}", displayable_plan.indent());
+///   // Format using display string
+///   let displayable_plan = displayable(physical_plan.as_ref());
+///   let plan_string = format!("{}", displayable_plan.indent());
 ///
-/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
-///            \n  CoalesceBatchesExec: target_batch_size=4096\
-///            \n    FilterExec: a@0 < 5\
-///            \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
-///            \n        CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true",
-///             plan_string.trim());
+///   assert_eq!("ProjectionExec: expr=[a@0 as a]\
+///              \n  CoalesceBatchesExec: target_batch_size=4096\
+///              \n    FilterExec: a@0 < 5\
+///              \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
+///              \n        CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true",
+///               plan_string.trim());
+/// }
 /// ```
 ///
 pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 55dc936..06f3a1d 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -54,7 +54,10 @@ use crate::{
 use arrow::compute::SortOptions;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::{compute::can_cast_types, datatypes::DataType};
+use async_trait::async_trait;
 use expressions::col;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt, TryStreamExt};
 use log::debug;
 use std::sync::Arc;
 
@@ -182,9 +185,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
 
 /// Physical query planner that converts a `LogicalPlan` to an
 /// `ExecutionPlan` suitable for execution.
+#[async_trait]
 pub trait PhysicalPlanner {
     /// Create a physical plan from a logical plan
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
@@ -243,17 +247,18 @@ impl Default for DefaultPhysicalPlanner {
     }
 }
 
+#[async_trait]
 impl PhysicalPlanner for DefaultPhysicalPlanner {
     /// Create a physical plan from a logical plan
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match self.handle_explain(logical_plan, ctx_state)? {
+        match self.handle_explain(logical_plan, ctx_state).await? {
             Some(plan) => Ok(plan),
             None => {
-                let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+                let plan = self.create_initial_plan(logical_plan, ctx_state).await?;
                 self.optimize_internal(plan, ctx_state, |_, _| {})
             }
         }
@@ -296,95 +301,343 @@ impl DefaultPhysicalPlanner {
     }
 
     /// Create a physical plan from a logical plan
-    fn create_initial_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state.config.batch_size;
-
-        match logical_plan {
-            LogicalPlan::TableScan {
-                source,
-                projection,
-                filters,
-                limit,
-                ..
-            } => {
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                source.scan(projection, batch_size, &filters, *limit)
-            }
-            LogicalPlan::Window {
-                input, window_expr, ..
-            } => {
-                if window_expr.is_empty() {
-                    return Err(DataFusionError::Internal(
-                        "Impossibly got empty window expression".to_owned(),
-                    ));
+    fn create_initial_plan<'a>(
+        &'a self,
+        logical_plan: &'a LogicalPlan,
+        ctx_state: &'a ExecutionContextState,
+    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+        async move {
+            let batch_size = ctx_state.config.batch_size;
+
+            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
+                LogicalPlan::TableScan {
+                    source,
+                    projection,
+                    filters,
+                    limit,
+                    ..
+                } => {
+                    // Remove all qualifiers from the scan as the provider
+                    // doesn't know (nor should care) how the relation was
+                    // referred to in the query
+                    let filters = unnormalize_cols(filters.iter().cloned());
+                    source.scan(projection, batch_size, &filters, *limit).await
                 }
+                LogicalPlan::Window {
+                    input, window_expr, ..
+                } => {
+                    if window_expr.is_empty() {
+                        return Err(DataFusionError::Internal(
+                            "Impossibly got empty window expression".to_owned(),
+                        ));
+                    }
+
+                    let input_exec = self.create_initial_plan(input, ctx_state).await?;
 
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
+                    // at this moment we are guaranteed by the logical planner
+                    // to have all the window_expr to have equal sort key
+                    let partition_keys = window_expr_common_partition_keys(window_expr)?;
 
-                // at this moment we are guaranteed by the logical planner
-                // to have all the window_expr to have equal sort key
-                let partition_keys = window_expr_common_partition_keys(window_expr)?;
+                    let can_repartition = !partition_keys.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_windows;
+
+                    let input_exec = if can_repartition {
+                        let partition_keys = partition_keys
+                            .iter()
+                            .map(|e| {
+                                self.create_physical_expr(
+                                    e,
+                                    input.schema(),
+                                    &input_exec.schema(),
+                                    ctx_state,
+                                )
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+                        Arc::new(RepartitionExec::try_new(
+                            input_exec,
+                            Partitioning::Hash(
+                                partition_keys,
+                                ctx_state.config.target_partitions,
+                            ),
+                        )?)
+                    } else {
+                        input_exec
+                    };
+
+                    // add a sort phase
+                    let get_sort_keys = |expr: &Expr| match expr {
+                        Expr::WindowFunction {
+                            ref partition_by,
+                            ref order_by,
+                            ..
+                        } => generate_sort_key(partition_by, order_by),
+                        _ => unreachable!(),
+                    };
+                    let sort_keys = get_sort_keys(&window_expr[0]);
+                    if window_expr.len() > 1 {
+                        debug_assert!(
+                            window_expr[1..]
+                                .iter()
+                                .all(|expr| get_sort_keys(expr) == sort_keys),
+                            "all window expressions shall have the same sort keys, as guaranteed by logical planning"
+                        );
+                    }
 
-                let can_repartition = !partition_keys.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_windows;
+                    let logical_input_schema = input.schema();
 
-                let input_exec = if can_repartition {
-                    let partition_keys = partition_keys
+                    let input_exec = if sort_keys.is_empty() {
+                        input_exec
+                    } else {
+                        let physical_input_schema = input_exec.schema();
+                        let sort_keys = sort_keys
+                            .iter()
+                            .map(|e| match e {
+                                Expr::Sort {
+                                    expr,
+                                    asc,
+                                    nulls_first,
+                                } => self.create_physical_sort_expr(
+                                    expr,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    SortOptions {
+                                        descending: !*asc,
+                                        nulls_first: *nulls_first,
+                                    },
+                                    ctx_state,
+                                ),
+                                _ => unreachable!(),
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        Arc::new(if can_repartition {
+                            SortExec::new_with_partitioning(sort_keys, input_exec, true)
+                        } else {
+                            SortExec::try_new(sort_keys, input_exec)?
+                        })
+                    };
+
+                    let physical_input_schema = input_exec.schema();
+                    let window_expr = window_expr
                         .iter()
                         .map(|e| {
-                            self.create_physical_expr(
+                            self.create_window_expr(
                                 e,
-                                input.schema(),
-                                &input_exec.schema(),
+                                logical_input_schema,
+                                &physical_input_schema,
                                 ctx_state,
                             )
                         })
-                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
-                    Arc::new(RepartitionExec::try_new(
+                        .collect::<Result<Vec<_>>>()?;
+
+                    Ok(Arc::new(WindowAggExec::try_new(
+                        window_expr,
                         input_exec,
-                        Partitioning::Hash(
-                            partition_keys,
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?)
-                } else {
-                    input_exec
-                };
+                        physical_input_schema,
+                    )?) )
+                }
+                LogicalPlan::Aggregate {
+                    input,
+                    group_expr,
+                    aggr_expr,
+                    ..
+                } => {
+                    // Initially need to perform the aggregate and then merge the partitions
+                    let input_exec = self.create_initial_plan(input, ctx_state).await?;
+                    let physical_input_schema = input_exec.schema();
+                    let logical_input_schema = input.as_ref().schema();
 
-                // add a sort phase
-                let get_sort_keys = |expr: &Expr| match expr {
-                    Expr::WindowFunction {
-                        ref partition_by,
-                        ref order_by,
-                        ..
-                    } => generate_sort_key(partition_by, order_by),
-                    _ => unreachable!(),
-                };
-                let sort_keys = get_sort_keys(&window_expr[0]);
-                if window_expr.len() > 1 {
-                    debug_assert!(
-                        window_expr[1..]
+                    let groups = group_expr
+                        .iter()
+                        .map(|e| {
+                            tuple_err((
+                                self.create_physical_expr(
+                                    e,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    ctx_state,
+                                ),
+                                physical_name(e),
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    let aggregates = aggr_expr
+                        .iter()
+                        .map(|e| {
+                            self.create_aggregate_expr(
+                                e,
+                                logical_input_schema,
+                                &physical_input_schema,
+                                ctx_state,
+                            )
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                    let initial_aggr = Arc::new(HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        groups.clone(),
+                        aggregates.clone(),
+                        input_exec,
+                        physical_input_schema.clone(),
+                    )?);
+
+                    // update group column indices based on partial aggregate plan evaluation
+                    let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
+                        .map(|i| col(&groups[i].1, &initial_aggr.schema()))
+                        .collect::<Result<_>>()?;
+
+                    // TODO: dictionary type not yet supported in Hash Repartition
+                    let contains_dict = groups
+                        .iter()
+                        .flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
+                        .any(|x| matches!(x, DataType::Dictionary(_, _)));
+
+                    let can_repartition = !groups.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_aggregations
+                        && !contains_dict;
+
+                    let (initial_aggr, next_partition_mode): (
+                        Arc<dyn ExecutionPlan>,
+                        AggregateMode,
+                    ) = if can_repartition {
+                        // Divide partial hash aggregates into multiple partitions by hash key
+                        let hash_repartition = Arc::new(RepartitionExec::try_new(
+                            initial_aggr,
+                            Partitioning::Hash(
+                                final_group.clone(),
+                                ctx_state.config.target_partitions,
+                            ),
+                        )?);
+                        // Combine hash aggregates within the partition
+                        (hash_repartition, AggregateMode::FinalPartitioned)
+                    } else {
+                        // construct a second aggregation, keeping the final column name equal to the
+                        // first aggregation and the expressions corresponding to the respective aggregate
+                        (initial_aggr, AggregateMode::Final)
+                    };
+
+                    Ok(Arc::new(HashAggregateExec::try_new(
+                        next_partition_mode,
+                        final_group
                             .iter()
-                            .all(|expr| get_sort_keys(expr) == sort_keys),
-                        "all window expressions shall have the same sort keys, as guaranteed by logical planning"
-                    );
+                            .enumerate()
+                            .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
+                            .collect(),
+                        aggregates,
+                        initial_aggr,
+                        physical_input_schema.clone(),
+                    )?) )
                 }
+                LogicalPlan::Projection { input, expr, .. } => {
+                    let input_exec = self.create_initial_plan(input, ctx_state).await?;
+                    let input_schema = input.as_ref().schema();
 
-                let logical_input_schema = input.schema();
+                    let physical_exprs = expr
+                        .iter()
+                        .map(|e| {
+                            // For projections, SQL planner and logical plan builder may convert user
+                            // provided expressions into logical Column expressions if their results
+                            // are already provided from the input plans. Because we work with
+                            // qualified columns in logical plane, derived columns involve operators or
+                            // functions will contain qualifers as well. This will result in logical
+                            // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
+                            //
+                            // If we run these logical columns through physical_name function, we will
+                            // get physical names with column qualifiers, which violates Datafusion's
+                            // field name semantics. To account for this, we need to derive the
+                            // physical name from physical input instead.
+                            //
+                            // This depends on the invariant that logical schema field index MUST match
+                            // with physical schema field index.
+                            let physical_name = if let Expr::Column(col) = e {
+                                match input_schema.index_of_column(col) {
+                                    Ok(idx) => {
+                                        // index physical field using logical field index
+                                        Ok(input_exec.schema().field(idx).name().to_string())
+                                    }
+                                    // logical column is not a derived column, safe to pass along to
+                                    // physical_name
+                                    Err(_) => physical_name(e),
+                                }
+                            } else {
+                                physical_name(e)
+                            };
 
-                let input_exec = if sort_keys.is_empty() {
-                    input_exec
-                } else {
-                    let physical_input_schema = input_exec.schema();
-                    let sort_keys = sort_keys
+                            tuple_err((
+                                self.create_physical_expr(
+                                    e,
+                                    input_schema,
+                                    &input_exec.schema(),
+                                    ctx_state,
+                                ),
+                                physical_name,
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                    Ok(Arc::new(ProjectionExec::try_new(
+                        physical_exprs,
+                        input_exec,
+                    )?) )
+                }
+                LogicalPlan::Filter {
+                    input, predicate, ..
+                } => {
+                    let physical_input = self.create_initial_plan(input, ctx_state).await?;
+                    let input_schema = physical_input.as_ref().schema();
+                    let input_dfschema = input.as_ref().schema();
+                    let runtime_expr = self.create_physical_expr(
+                        predicate,
+                        input_dfschema,
+                        &input_schema,
+                        ctx_state,
+                    )?;
+                    Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) )
+                }
+                LogicalPlan::Union { inputs, .. } => {
+                    let physical_plans = futures::stream::iter(inputs)
+                        .then(|lp| self.create_initial_plan(lp, ctx_state))
+                        .try_collect::<Vec<_>>()
+                        .await?;
+                    Ok(Arc::new(UnionExec::new(physical_plans)) )
+                }
+                LogicalPlan::Repartition {
+                    input,
+                    partitioning_scheme,
+                } => {
+                    let physical_input = self.create_initial_plan(input, ctx_state).await?;
+                    let input_schema = physical_input.schema();
+                    let input_dfschema = input.as_ref().schema();
+                    let physical_partitioning = match partitioning_scheme {
+                        LogicalPartitioning::RoundRobinBatch(n) => {
+                            Partitioning::RoundRobinBatch(*n)
+                        }
+                        LogicalPartitioning::Hash(expr, n) => {
+                            let runtime_expr = expr
+                                .iter()
+                                .map(|e| {
+                                    self.create_physical_expr(
+                                        e,
+                                        input_dfschema,
+                                        &input_schema,
+                                        ctx_state,
+                                    )
+                                })
+                                .collect::<Result<Vec<_>>>()?;
+                            Partitioning::Hash(runtime_expr, *n)
+                        }
+                    };
+                    Ok(Arc::new(RepartitionExec::try_new(
+                        physical_input,
+                        physical_partitioning,
+                    )?) )
+                }
+                LogicalPlan::Sort { expr, input, .. } => {
+                    let physical_input = self.create_initial_plan(input, ctx_state).await?;
+                    let input_schema = physical_input.as_ref().schema();
+                    let input_dfschema = input.as_ref().schema();
+                    let sort_expr = expr
                         .iter()
                         .map(|e| match e {
                             Expr::Sort {
@@ -393,423 +646,175 @@ impl DefaultPhysicalPlanner {
                                 nulls_first,
                             } => self.create_physical_sort_expr(
                                 expr,
-                                logical_input_schema,
-                                &physical_input_schema,
+                                input_dfschema,
+                                &input_schema,
                                 SortOptions {
                                     descending: !*asc,
                                     nulls_first: *nulls_first,
                                 },
                                 ctx_state,
                             ),
-                            _ => unreachable!(),
+                            _ => Err(DataFusionError::Plan(
+                                "Sort only accepts sort expressions".to_string(),
+                            )),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Arc::new(if can_repartition {
-                        SortExec::new_with_partitioning(sort_keys, input_exec, true)
-                    } else {
-                        SortExec::try_new(sort_keys, input_exec)?
-                    })
-                };
-
-                let physical_input_schema = input_exec.schema();
-                let window_expr = window_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_window_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    input_exec,
-                    physical_input_schema,
-                )?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the partitions
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let physical_input_schema = input_exec.schema();
-                let logical_input_schema = input.as_ref().schema();
-
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                logical_input_schema,
-                                &physical_input_schema,
-                                ctx_state,
-                            ),
-                            physical_name(e),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_aggregate_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = Arc::new(HashAggregateExec::try_new(
-                    AggregateMode::Partial,
-                    groups.clone(),
-                    aggregates.clone(),
-                    input_exec,
-                    physical_input_schema.clone(),
-                )?);
-
-                // update group column indices based on partial aggregate plan evaluation
-                let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
-                    .map(|i| col(&groups[i].1, &initial_aggr.schema()))
-                    .collect::<Result<_>>()?;
-
-                // TODO: dictionary type not yet supported in Hash Repartition
-                let contains_dict = groups
-                    .iter()
-                    .flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
-                    .any(|x| matches!(x, DataType::Dictionary(_, _)));
-
-                let can_repartition = !groups.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_aggregations
-                    && !contains_dict;
-
-                let (initial_aggr, next_partition_mode): (
-                    Arc<dyn ExecutionPlan>,
-                    AggregateMode,
-                ) = if can_repartition {
-                    // Divide partial hash aggregates into multiple partitions by hash key
-                    let hash_repartition = Arc::new(RepartitionExec::try_new(
-                        initial_aggr,
-                        Partitioning::Hash(
-                            final_group.clone(),
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?);
-                    // Combine hash aggregates within the partition
-                    (hash_repartition, AggregateMode::FinalPartitioned)
-                } else {
-                    // construct a second aggregation, keeping the final column name equal to the
-                    // first aggregation and the expressions corresponding to the respective aggregate
-                    (initial_aggr, AggregateMode::Final)
-                };
-
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    next_partition_mode,
-                    final_group
+                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) )
+                }
+                LogicalPlan::Join {
+                    left,
+                    right,
+                    on: keys,
+                    join_type,
+                    ..
+                } => {
+                    let left_df_schema = left.schema();
+                    let physical_left = self.create_initial_plan(left, ctx_state).await?;
+                    let right_df_schema = right.schema();
+                    let physical_right = self.create_initial_plan(right, ctx_state).await?;
+                    let join_on = keys
                         .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    aggregates,
-                    initial_aggr,
-                    physical_input_schema.clone(),
-                )?))
-            }
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = input.as_ref().schema();
-
-                let physical_exprs = expr
-                    .iter()
-                    .map(|e| {
-                        // For projections, SQL planner and logical plan builder may convert user
-                        // provided expressions into logical Column expressions if their results
-                        // are already provided from the input plans. Because we work with
-                        // qualified columns in logical plane, derived columns involve operators or
-                        // functions will contain qualifers as well. This will result in logical
-                        // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
-                        //
-                        // If we run these logical columns through physical_name function, we will
-                        // get physical names with column qualifiers, which violates Datafusion's
-                        // field name semantics. To account for this, we need to derive the
-                        // physical name from physical input instead.
-                        //
-                        // This depends on the invariant that logical schema field index MUST match
-                        // with physical schema field index.
-                        let physical_name = if let Expr::Column(col) = e {
-                            match input_schema.index_of_column(col) {
-                                Ok(idx) => {
-                                    // index physical field using logical field index
-                                    Ok(input_exec.schema().field(idx).name().to_string())
-                                }
-                                // logical column is not a derived column, safe to pass along to
-                                // physical_name
-                                Err(_) => physical_name(e),
-                            }
-                        } else {
-                            physical_name(e)
-                        };
-
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                input_schema,
-                                &input_exec.schema(),
-                                ctx_state,
-                            ),
-                            physical_name,
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                        .map(|(l, r)| {
+                            Ok((
+                                Column::new(&l.name, left_df_schema.index_of_column(l)?),
+                                Column::new(&r.name, right_df_schema.index_of_column(r)?),
+                            ))
+                        })
+                        .collect::<Result<join_utils::JoinOn>>()?;
 
-                Ok(Arc::new(ProjectionExec::try_new(
-                    physical_exprs,
-                    input_exec,
-                )?))
-            }
-            LogicalPlan::Filter {
-                input, predicate, ..
-            } => {
-                let physical_input = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
-                let runtime_expr = self.create_physical_expr(
-                    predicate,
-                    input_dfschema,
-                    &input_schema,
-                    ctx_state,
-                )?;
-                Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
-            }
-            LogicalPlan::Union { inputs, .. } => {
-                let physical_plans = inputs
-                    .iter()
-                    .map(|input| self.create_initial_plan(input, ctx_state))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(UnionExec::new(physical_plans)))
-            }
-            LogicalPlan::Repartition {
-                input,
-                partitioning_scheme,
-            } => {
-                let physical_input = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = physical_input.schema();
-                let input_dfschema = input.as_ref().schema();
-                let physical_partitioning = match partitioning_scheme {
-                    LogicalPartitioning::RoundRobinBatch(n) => {
-                        Partitioning::RoundRobinBatch(*n)
-                    }
-                    LogicalPartitioning::Hash(expr, n) => {
-                        let runtime_expr = expr
+                    if ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_joins
+                    {
+                        let (left_expr, right_expr) = join_on
                             .iter()
-                            .map(|e| {
-                                self.create_physical_expr(
-                                    e,
-                                    input_dfschema,
-                                    &input_schema,
-                                    ctx_state,
+                            .map(|(l, r)| {
+                                (
+                                    Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
+                                    Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
                                 )
                             })
-                            .collect::<Result<Vec<_>>>()?;
-                        Partitioning::Hash(runtime_expr, *n)
-                    }
-                };
-                Ok(Arc::new(RepartitionExec::try_new(
-                    physical_input,
-                    physical_partitioning,
-                )?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let physical_input = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
-
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            input_dfschema,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                            ctx_state,
-                        ),
-                        _ => Err(DataFusionError::Plan(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?))
-            }
-            LogicalPlan::Join {
-                left,
-                right,
-                on: keys,
-                join_type,
-                ..
-            } => {
-                let left_df_schema = left.schema();
-                let physical_left = self.create_initial_plan(left, ctx_state)?;
-                let right_df_schema = right.schema();
-                let physical_right = self.create_initial_plan(right, ctx_state)?;
-                let join_on = keys
-                    .iter()
-                    .map(|(l, r)| {
-                        Ok((
-                            Column::new(&l.name, left_df_schema.index_of_column(l)?),
-                            Column::new(&r.name, right_df_schema.index_of_column(r)?),
-                        ))
-                    })
-                    .collect::<Result<join_utils::JoinOn>>()?;
-
-                if ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_joins
-                {
-                    let (left_expr, right_expr) = join_on
-                        .iter()
-                        .map(|(l, r)| {
-                            (
-                                Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
-                                Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
-                            )
-                        })
-                        .unzip();
-
-                    // Use hash partition by default to parallelize hash joins
-                    Ok(Arc::new(HashJoinExec::try_new(
-                        Arc::new(RepartitionExec::try_new(
+                            .unzip();
+
+                        // Use hash partition by default to parallelize hash joins
+                        Ok(Arc::new(HashJoinExec::try_new(
+                            Arc::new(RepartitionExec::try_new(
+                                physical_left,
+                                Partitioning::Hash(
+                                    left_expr,
+                                    ctx_state.config.target_partitions,
+                                ),
+                            )?),
+                            Arc::new(RepartitionExec::try_new(
+                                physical_right,
+                                Partitioning::Hash(
+                                    right_expr,
+                                    ctx_state.config.target_partitions,
+                                ),
+                            )?),
+                            join_on,
+                            join_type,
+                            PartitionMode::Partitioned,
+                        )?))
+                    } else {
+                        Ok(Arc::new(HashJoinExec::try_new(
                             physical_left,
-                            Partitioning::Hash(
-                                left_expr,
-                                ctx_state.config.target_partitions,
-                            ),
-                        )?),
-                        Arc::new(RepartitionExec::try_new(
                             physical_right,
-                            Partitioning::Hash(
-                                right_expr,
-                                ctx_state.config.target_partitions,
-                            ),
-                        )?),
-                        join_on,
-                        join_type,
-                        PartitionMode::Partitioned,
-                    )?))
-                } else {
-                    Ok(Arc::new(HashJoinExec::try_new(
-                        physical_left,
-                        physical_right,
-                        join_on,
-                        join_type,
-                        PartitionMode::CollectLeft,
-                    )?))
+                            join_on,
+                            join_type,
+                            PartitionMode::CollectLeft,
+                        )?))
+                    }
                 }
-            }
-            LogicalPlan::CrossJoin { left, right, .. } => {
-                let left = self.create_initial_plan(left, ctx_state)?;
-                let right = self.create_initial_plan(right, ctx_state)?;
-                Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
-            }
-            LogicalPlan::EmptyRelation {
-                produce_one_row,
-                schema,
-            } => Ok(Arc::new(EmptyExec::new(
-                *produce_one_row,
-                SchemaRef::new(schema.as_ref().to_owned().into()),
-            ))),
-            LogicalPlan::Limit { input, n, .. } => {
-                let limit = *n;
-                let input = self.create_initial_plan(input, ctx_state)?;
-
-                // GlobalLimitExec requires a single partition for input
-                let input = if input.output_partitioning().partition_count() == 1 {
-                    input
-                } else {
-                    // Apply a LocalLimitExec to each partition. The optimizer will also insert
-                    // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
-                    Arc::new(LocalLimitExec::new(input, limit))
-                };
-
-                Ok(Arc::new(GlobalLimitExec::new(input, limit)))
-            }
-            LogicalPlan::CreateExternalTable { .. } => {
-                // There is no default plan for "CREATE EXTERNAL
-                // TABLE" -- it must be handled at a higher level (so
-                // that the appropriate table can be registered with
-                // the context)
-                Err(DataFusionError::Internal(
-                    "Unsupported logical plan: CreateExternalTable".to_string(),
-                ))
-            }
-            LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
-                "Unsupported logical plan: Explain must be root of the plan".to_string(),
-            )),
-            LogicalPlan::Analyze {
-                verbose,
-                input,
-                schema,
-            } => {
-                let input = self.create_initial_plan(input, ctx_state)?;
-                let schema = SchemaRef::new(schema.as_ref().to_owned().into());
-                Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
-            }
-            LogicalPlan::Extension { node } => {
-                let physical_inputs = node
-                    .inputs()
-                    .into_iter()
-                    .map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
-                    .collect::<Result<Vec<_>>>()?;
+                LogicalPlan::CrossJoin { left, right, .. } => {
+                    let left = self.create_initial_plan(left, ctx_state).await?;
+                    let right = self.create_initial_plan(right, ctx_state).await?;
+                    Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+                }
+                LogicalPlan::EmptyRelation {
+                    produce_one_row,
+                    schema,
+                } => Ok(Arc::new(EmptyExec::new(
+                    *produce_one_row,
+                    SchemaRef::new(schema.as_ref().to_owned().into()),
+                ))),
+                LogicalPlan::Limit { input, n, .. } => {
+                    let limit = *n;
+                    let input = self.create_initial_plan(input, ctx_state).await?;
+
+                    // GlobalLimitExec requires a single partition for input
+                    let input = if input.output_partitioning().partition_count() == 1 {
+                        input
+                    } else {
+                        // Apply a LocalLimitExec to each partition. The optimizer will also insert
+                        // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
+                        Arc::new(LocalLimitExec::new(input, limit))
+                    };
 
-                let maybe_plan = self.extension_planners.iter().try_fold(
-                    None,
-                    |maybe_plan, planner| {
-                        if let Some(plan) = maybe_plan {
-                            Ok(Some(plan))
-                        } else {
-                            planner.plan_extension(
-                                self,
-                                node.as_ref(),
-                                &node.inputs(),
-                                &physical_inputs,
-                                ctx_state,
-                            )
-                        }
-                    },
-                )?;
-                let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
-                    "No installed planner was able to convert the custom node to an execution plan: {:?}", node
-                )))?;
-
-                // Ensure the ExecutionPlan's schema matches the
-                // declared logical schema to catch and warn about
-                // logic errors when creating user defined plans.
-                if !node.schema().matches_arrow_schema(&plan.schema()) {
-                    Err(DataFusionError::Plan(format!(
-                        "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
-                         LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
-                        node, node.schema(), plan.schema()
-                    )))
-                } else {
-                    Ok(plan)
+                    Ok(Arc::new(GlobalLimitExec::new(input, limit)))
                 }
-            }
-        }
+                LogicalPlan::CreateExternalTable { .. } => {
+                    // There is no default plan for "CREATE EXTERNAL
+                    // TABLE" -- it must be handled at a higher level (so
+                    // that the appropriate table can be registered with
+                    // the context)
+                    Err(DataFusionError::Internal(
+                        "Unsupported logical plan: CreateExternalTable".to_string(),
+                    ))
+                }
+                LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
+                    "Unsupported logical plan: Explain must be root of the plan".to_string(),
+                )),
+                LogicalPlan::Analyze {
+                    verbose,
+                    input,
+                    schema,
+                } => {
+                    let input = self.create_initial_plan(input, ctx_state).await?;
+                    let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+                    Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
+                }
+                LogicalPlan::Extension { node } => {
+                    let physical_inputs = futures::stream::iter(node.inputs())
+                        .then(|lp| self.create_initial_plan(lp, ctx_state))
+                        .try_collect::<Vec<_>>()
+                        .await?;
+
+                    let maybe_plan = self.extension_planners.iter().try_fold(
+                        None,
+                        |maybe_plan, planner| {
+                            if let Some(plan) = maybe_plan {
+                                Ok(Some(plan))
+                            } else {
+                                planner.plan_extension(
+                                    self,
+                                    node.as_ref(),
+                                    &node.inputs(),
+                                    &physical_inputs,
+                                    ctx_state,
+                                )
+                            }
+                        },
+                    )?;
+                    let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
+                        "No installed planner was able to convert the custom node to an execution plan: {:?}", node
+                    )))?;
+
+                    // Ensure the ExecutionPlan's schema matches the
+                    // declared logical schema to catch and warn about
+                    // logic errors when creating user defined plans.
+                    if !node.schema().matches_arrow_schema(&plan.schema()) {
+                        Err(DataFusionError::Plan(format!(
+                            "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
+                            LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
+                            node, node.schema(), plan.schema()
+                        )))
+                    } else {
+                        Ok(plan)
+                    }
+                }
+            };
+            exec_plan
+        }.boxed()
     }
 
     /// Create a physical expression from a logical expression
@@ -1315,7 +1320,7 @@ impl DefaultPhysicalPlanner {
     /// Returns
     /// Some(plan) if optimized, and None if logical_plan was not an
     /// explain (and thus needs to be optimized as normal)
-    fn handle_explain(
+    async fn handle_explain(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
@@ -1332,7 +1337,7 @@ impl DefaultPhysicalPlanner {
 
             stringified_plans.push(plan.to_stringified(FinalLogicalPlan));
 
-            let input = self.create_initial_plan(plan, ctx_state)?;
+            let input = self.create_initial_plan(plan, ctx_state).await?;
 
             stringified_plans
                 .push(displayable(input.as_ref()).to_stringified(InitialPhysicalPlan));
@@ -1411,15 +1416,15 @@ mod tests {
         ExecutionContextState::new()
     }
 
-    fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
+    async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
         let mut ctx_state = make_ctx_state();
         ctx_state.config.target_partitions = 4;
         let planner = DefaultPhysicalPlanner::default();
-        planner.create_physical_plan(logical_plan, &ctx_state)
+        planner.create_physical_plan(logical_plan, &ctx_state).await
     }
 
-    #[test]
-    fn test_all_operators() -> Result<()> {
+    #[tokio::test]
+    async fn test_all_operators() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
 
@@ -1433,7 +1438,7 @@ mod tests {
             .limit(10)?
             .build()?;
 
-        let plan = plan(&logical_plan)?;
+        let plan = plan(&logical_plan).await?;
 
         // verify that the plan correctly casts u8 to i64
         // the cast here is implicit so has CastOptions with safe=true
@@ -1443,8 +1448,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_create_not() -> Result<()> {
+    #[tokio::test]
+    async fn test_create_not() -> Result<()> {
         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
         let dfschema = DFSchema::try_from(schema.clone())?;
 
@@ -1463,8 +1468,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_with_csv_plan() -> Result<()> {
+    #[tokio::test]
+    async fn test_with_csv_plan() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
 
@@ -1473,7 +1478,7 @@ mod tests {
             .filter(col("c7").lt(col("c12")))?
             .build()?;
 
-        let plan = plan(&logical_plan)?;
+        let plan = plan(&logical_plan).await?;
 
         // c12 is f64, c7 is u8 -> cast c7 to f64
         // the cast here is implicit so has CastOptions with safe=true
@@ -1482,8 +1487,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn errors() -> Result<()> {
+    #[tokio::test]
+    async fn errors() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
         let options = CsvReadOptions::new().schema_infer_max_records(100);
@@ -1517,14 +1522,16 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn default_extension_planner() {
+    #[tokio::test]
+    async fn default_extension_planner() {
         let ctx_state = make_ctx_state();
         let planner = DefaultPhysicalPlanner::default();
         let logical_plan = LogicalPlan::Extension {
             node: Arc::new(NoOpExtensionNode::default()),
         };
-        let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
+        let plan = planner
+            .create_physical_plan(&logical_plan, &ctx_state)
+            .await;
 
         let expected_error =
             "No installed planner was able to convert the custom node to an execution plan: NoOp";
@@ -1539,8 +1546,8 @@ mod tests {
         }
     }
 
-    #[test]
-    fn bad_extension_planner() {
+    #[tokio::test]
+    async fn bad_extension_planner() {
         // Test that creating an execution plan whose schema doesn't
         // match the logical plan's schema generates an error.
         let ctx_state = make_ctx_state();
@@ -1551,7 +1558,9 @@ mod tests {
         let logical_plan = LogicalPlan::Extension {
             node: Arc::new(NoOpExtensionNode::default()),
         };
-        let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
+        let plan = planner
+            .create_physical_plan(&logical_plan, &ctx_state)
+            .await;
 
         let expected_error: &str = "Error during planning: \
         Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
@@ -1584,8 +1593,8 @@ mod tests {
         }
     }
 
-    #[test]
-    fn in_list_types() -> Result<()> {
+    #[tokio::test]
+    async fn in_list_types() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
         let options = CsvReadOptions::new().schema_infer_max_records(100);
@@ -1600,7 +1609,7 @@ mod tests {
             .filter(col("c12").lt(lit(0.05)))?
             .project(vec![col("c1").in_list(list, false)])?
             .build()?;
-        let execution_plan = plan(&logical_plan)?;
+        let execution_plan = plan(&logical_plan).await?;
         // verify that the plan correctly adds cast from Int64(1) to Utf8
         let expected = "InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }], negated: false }";
         assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1615,7 +1624,7 @@ mod tests {
             .filter(col("c12").lt(lit(0.05)))?
             .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
             .build()?;
-        let execution_plan = plan(&logical_plan);
+        let execution_plan = plan(&logical_plan).await;
 
         let expected_error = "Unsupported CAST from Utf8 to Boolean";
         match execution_plan {
@@ -1631,8 +1640,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn hash_agg_input_schema() -> Result<()> {
+    #[tokio::test]
+    async fn hash_agg_input_schema() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
 
@@ -1646,7 +1655,7 @@ mod tests {
         .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
         .build()?;
 
-        let execution_plan = plan(&logical_plan)?;
+        let execution_plan = plan(&logical_plan).await?;
         let final_hash_agg = execution_plan
             .as_any()
             .downcast_ref::<HashAggregateExec>()
@@ -1662,8 +1671,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn hash_agg_group_by_partitioned() -> Result<()> {
+    #[tokio::test]
+    async fn hash_agg_group_by_partitioned() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
 
@@ -1672,7 +1681,7 @@ mod tests {
             .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
             .build()?;
 
-        let execution_plan = plan(&logical_plan)?;
+        let execution_plan = plan(&logical_plan).await?;
         let formatted = format!("{:?}", execution_plan);
 
         // Make sure the plan contains a FinalPartitioned, which means it will not use the Final
@@ -1682,8 +1691,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_explain() {
+    #[tokio::test]
+    async fn test_explain() {
         let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
 
         let logical_plan =
@@ -1694,7 +1703,7 @@ mod tests {
                 .build()
                 .unwrap();
 
-        let plan = plan(&logical_plan).unwrap();
+        let plan = plan(&logical_plan).await.unwrap();
         if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
             let stringified_plans = plan.stringified_plans();
             assert!(stringified_plans.len() >= 4);
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index 31551a9..a6456f1 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -182,6 +182,7 @@ impl ExecutionPlan for CustomExecutionPlan {
     }
 }
 
+#[async_trait]
 impl TableProvider for CustomTableProvider {
     fn as_any(&self) -> &dyn Any {
         self
@@ -191,7 +192,7 @@ impl TableProvider for CustomTableProvider {
         TEST_CUSTOM_SCHEMA_REF!()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
@@ -236,7 +237,7 @@ async fn custom_source_dataframe() -> Result<()> {
     );
     assert_eq!(format!("{:?}", optimized_plan), expected);
 
-    let physical_plan = ctx.create_physical_plan(&optimized_plan)?;
+    let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
 
     assert_eq!(1, physical_plan.schema().fields().len());
     assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -260,7 +261,10 @@ async fn optimizers_catch_all_statistics() {
         .sql("SELECT count(*), min(c1), max(c1) from test")
         .unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
 
     // when the optimization kicks in, the source is replaced by an EmptyExec
     assert!(
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index 14f5dd2..511a9e6 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -533,6 +533,7 @@ impl ContextWithParquet {
         let physical_plan = self
             .ctx
             .create_physical_plan(&logical_plan)
+            .await
             .expect("creating physical plan");
 
         let results = datafusion::physical_plan::collect(physical_plan.clone())
diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs
index e0102c4..653b96c 100644
--- a/datafusion/tests/provider_filter_pushdown.rs
+++ b/datafusion/tests/provider_filter_pushdown.rs
@@ -110,6 +110,7 @@ struct CustomProvider {
     one_batch: RecordBatch,
 }
 
+#[async_trait]
 impl TableProvider for CustomProvider {
     fn as_any(&self) -> &dyn std::any::Any {
         self
@@ -119,7 +120,7 @@ impl TableProvider for CustomProvider {
         self.zero_batch.schema()
     }
 
-    fn scan(
+    async fn scan(
         &self,
         _: &Option<Vec<usize>>,
         _: usize,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4cd0ed3..a67c82b 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -188,7 +188,7 @@ async fn parquet_single_nan_schema() {
     let sql = "SELECT mycol FROM single_nan";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(plan).await.unwrap();
     for batch in results {
         assert_eq!(1, batch.num_rows());
@@ -223,7 +223,7 @@ async fn parquet_list_columns() {
     let sql = "SELECT int64_list, utf8_list FROM list_columns";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(plan).await.unwrap();
 
     //   int64_list              utf8_list
@@ -928,7 +928,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> {
     let sql = "SELECT avg(c12) FROM aggregate_test_100";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(plan).await.unwrap();
     let batch = &results[0];
     let column = batch.column(0);
@@ -2366,7 +2366,7 @@ async fn explain_analyze_baseline_metrics() {
     println!("running query: {}", sql);
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+    let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(physical_plan.clone()).await.unwrap();
     let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap();
     println!("Query Output:\n\n{}", formatted);
@@ -2648,7 +2648,7 @@ async fn csv_explain_plans() {
     // Physical plan
     // Create plan
     let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
-    let plan = ctx.create_physical_plan(&plan).expect(&msg);
+    let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
     //
     // Execute plan
     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
@@ -2845,7 +2845,7 @@ async fn csv_explain_verbose_plans() {
     // Physical plan
     // Create plan
     let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
-    let plan = ctx.create_physical_plan(&plan).expect(&msg);
+    let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
     //
     // Execute plan
     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
@@ -3002,7 +3002,7 @@ async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec<Record
     let optimized_logical_schema = plan.schema();
 
     let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
-    let plan = ctx.create_physical_plan(&plan).expect(&msg);
+    let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
 
     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
     let results = collect(plan).await.expect(&msg);
@@ -4401,7 +4401,7 @@ async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
     let plan = ctx.create_logical_plan(sql).expect(&msg);
 
     let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
-    let plan = ctx.create_physical_plan(&plan).expect(&msg);
+    let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
 
     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
     let res = collect(plan).await.expect(&msg);
@@ -4439,7 +4439,7 @@ async fn test_cast_expressions_error() -> Result<()> {
     let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
     let result = collect(plan).await;
 
     match result {
@@ -4469,7 +4469,7 @@ async fn test_physical_plan_display_indent() {
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+    let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let expected = vec![
         "GlobalLimitExec: limit=10",
         "  SortExec: [the_min@2 DESC]",
@@ -4517,7 +4517,7 @@ async fn test_physical_plan_display_indent_multi_children() {
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+    let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let expected = vec![
         "ProjectionExec: expr=[c1@0 as c1]",
         "  CoalesceBatchesExec: target_batch_size=4096",
@@ -4555,7 +4555,7 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100";
     let logical_plan = ctx.create_logical_plan(sql)?;
-    let physical_plan = ctx.create_physical_plan(&logical_plan);
+    let physical_plan = ctx.create_physical_plan(&logical_plan).await;
     let err = physical_plan.unwrap_err();
     assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
     Ok(())
@@ -4875,7 +4875,7 @@ async fn avro_single_nan_schema() {
     let sql = "SELECT mycol FROM single_nan";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
-    let plan = ctx.create_physical_plan(&plan).unwrap();
+    let plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(plan).await.unwrap();
     for batch in results {
         assert_eq!(1, batch.num_rows());
diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs
index a2375ad..7a19aa7 100644
--- a/datafusion/tests/statistics.rs
+++ b/datafusion/tests/statistics.rs
@@ -59,6 +59,7 @@ impl StatisticsValidation {
     }
 }
 
+#[async_trait]
 impl TableProvider for StatisticsValidation {
     fn as_any(&self) -> &dyn Any {
         self
@@ -68,7 +69,7 @@ impl TableProvider for StatisticsValidation {
         Arc::clone(&self.schema)
     }
 
-    fn scan(
+    async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
         _batch_size: usize,
@@ -212,7 +213,10 @@ async fn sql_basic() -> Result<()> {
 
     let df = ctx.sql("SELECT * from stats_table").unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
 
     // the statistics should be those of the source
     assert_eq!(stats, physical_plan.statistics());
@@ -227,7 +231,10 @@ async fn sql_filter() -> Result<()> {
 
     let df = ctx.sql("SELECT * FROM stats_table WHERE c1 = 5").unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
 
     // with a filtering condition we loose all knowledge about the statistics
     assert_eq!(Statistics::default(), physical_plan.statistics());
@@ -241,7 +248,10 @@ async fn sql_limit() -> Result<()> {
     let mut ctx = init_ctx(stats.clone(), schema)?;
 
     let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").unwrap();
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
     // when the limit is smaller than the original number of lines
     // we loose all statistics except the for number of rows which becomes the limit
     assert_eq!(
@@ -254,7 +264,10 @@ async fn sql_limit() -> Result<()> {
     );
 
     let df = ctx.sql("SELECT * FROM stats_table LIMIT 100").unwrap();
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
     // when the limit is larger than the original number of lines, statistics remain unchanged
     assert_eq!(stats, physical_plan.statistics());
 
@@ -270,7 +283,10 @@ async fn sql_window() -> Result<()> {
         .sql("SELECT c2, sum(c1) over (partition by c2) FROM stats_table")
         .unwrap();
 
-    let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap();
+    let physical_plan = ctx
+        .create_physical_plan(&df.to_logical_plan())
+        .await
+        .unwrap();
 
     let result = physical_plan.statistics();
 
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index d57a24c..14600f2 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -202,10 +202,11 @@ fn make_topk_context() -> ExecutionContext {
 
 struct TopKQueryPlanner {}
 
+#[async_trait]
 impl QueryPlanner for TopKQueryPlanner {
     /// Given a `LogicalPlan` created from above, create an
     /// `ExecutionPlan` suitable for execution
-    fn create_physical_plan(
+    async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
@@ -216,7 +217,9 @@ impl QueryPlanner for TopKQueryPlanner {
                 TopKPlanner {},
             )]);
         // Delegate most work of physical planning to the default physical planner
-        physical_planner.create_physical_plan(logical_plan, ctx_state)
+        physical_planner
+            .create_physical_plan(logical_plan, ctx_state)
+            .await
     }
 }
 
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index 0885ae3..48da234 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -122,14 +122,19 @@ impl DataFrame {
     /// Unless some order is specified in the plan, there is no guarantee of the order of the result
     fn collect(&self, py: Python) -> PyResult<PyObject> {
         let ctx = _ExecutionContext::from(self.ctx_state.clone());
+        let rt = Runtime::new().unwrap();
         let plan = ctx
             .optimize(&self.plan)
             .map_err(|e| -> errors::DataFusionError { e.into() })?;
-        let plan = ctx
-            .create_physical_plan(&plan)
-            .map_err(|e| -> errors::DataFusionError { e.into() })?;
 
-        let rt = Runtime::new().unwrap();
+        let plan = py.allow_threads(|| {
+            rt.block_on(async {
+                ctx.create_physical_plan(&plan)
+                    .await
+                    .map_err(|e| -> errors::DataFusionError { e.into() })
+            })
+        })?;
+
         let batches = py.allow_threads(|| {
             rt.block_on(async {
                 collect(plan)
@@ -144,12 +149,20 @@ impl DataFrame {
     #[args(num = "20")]
     fn show(&self, py: Python, num: usize) -> PyResult<()> {
         let ctx = _ExecutionContext::from(self.ctx_state.clone());
-        let plan = ctx
-            .optimize(&self.limit(num)?.plan)
-            .and_then(|plan| ctx.create_physical_plan(&plan))
-            .map_err(|e| -> errors::DataFusionError { e.into() })?;
-
         let rt = Runtime::new().unwrap();
+        let plan = py.allow_threads(|| {
+            rt.block_on(async {
+                let l_plan = ctx
+                    .optimize(&self.limit(num)?.plan)
+                    .map_err(|e| -> errors::DataFusionError { e.into() })?;
+                let p_plan = ctx
+                    .create_physical_plan(&l_plan)
+                    .await
+                    .map_err(|e| -> errors::DataFusionError { e.into() })?;
+                Ok::<_, PyErr>(p_plan)
+            })
+        })?;
+
         let batches = py.allow_threads(|| {
             rt.block_on(async {
                 collect(plan)