You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/04 16:09:09 UTC

[arrow-datafusion] branch master updated: Make ExecutionPlan sync (#2307) (#2434)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 807b7a5f7 Make ExecutionPlan sync (#2307) (#2434)
807b7a5f7 is described below

commit 807b7a5f7eb858e9f7162e1f00ffeeedd0bf2050
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 4 17:09:03 2022 +0100

    Make ExecutionPlan sync (#2307) (#2434)
---
 .../core/src/execution_plans/distributed_query.rs  |  4 +--
 .../core/src/execution_plans/shuffle_reader.rs     |  4 +--
 .../core/src/execution_plans/shuffle_writer.rs     | 10 +++---
 .../core/src/execution_plans/unresolved_shuffle.rs |  4 +--
 ballista/rust/core/src/serde/mod.rs                |  3 +-
 ballista/rust/executor/src/collect.rs              | 10 ++----
 datafusion-examples/examples/custom_datasource.rs  |  7 ++--
 datafusion/core/src/datasource/file_format/csv.rs  |  2 +-
 datafusion/core/src/datasource/file_format/json.rs |  2 +-
 .../core/src/datasource/file_format/parquet.rs     |  2 +-
 datafusion/core/src/datasource/memory.rs           | 10 +++---
 .../src/physical_optimizer/aggregate_statistics.rs |  2 +-
 .../core/src/physical_plan/aggregates/mod.rs       | 15 +++------
 datafusion/core/src/physical_plan/analyze.rs       |  6 ++--
 .../core/src/physical_plan/coalesce_batches.rs     |  8 ++---
 .../core/src/physical_plan/coalesce_partitions.rs  |  9 ++---
 datafusion/core/src/physical_plan/common.rs        |  2 +-
 datafusion/core/src/physical_plan/cross_join.rs    |  7 ++--
 datafusion/core/src/physical_plan/empty.rs         | 12 +++----
 datafusion/core/src/physical_plan/explain.rs       |  4 +--
 .../core/src/physical_plan/file_format/avro.rs     |  6 ++--
 .../core/src/physical_plan/file_format/csv.rs      | 14 ++++----
 .../core/src/physical_plan/file_format/json.rs     | 12 +++----
 .../core/src/physical_plan/file_format/parquet.rs  | 14 ++++----
 datafusion/core/src/physical_plan/filter.rs        |  8 ++---
 datafusion/core/src/physical_plan/hash_join.rs     | 34 +++++++++----------
 datafusion/core/src/physical_plan/limit.rs         | 13 +++-----
 datafusion/core/src/physical_plan/memory.rs        |  8 ++---
 datafusion/core/src/physical_plan/mod.rs           | 10 +++---
 datafusion/core/src/physical_plan/planner.rs       |  4 +--
 datafusion/core/src/physical_plan/projection.rs    |  8 ++---
 datafusion/core/src/physical_plan/repartition.rs   | 38 ++++++++++------------
 .../core/src/physical_plan/sort_merge_join.rs      | 12 +++----
 datafusion/core/src/physical_plan/sorts/sort.rs    |  5 ++-
 .../physical_plan/sorts/sort_preserving_merge.rs   | 29 ++++++-----------
 datafusion/core/src/physical_plan/union.rs         |  6 ++--
 datafusion/core/src/physical_plan/values.rs        |  4 +--
 .../src/physical_plan/windows/window_agg_exec.rs   |  6 ++--
 datafusion/core/src/scheduler/mod.rs               |  1 -
 .../core/src/scheduler/pipeline/execution.rs       | 29 +++--------------
 datafusion/core/src/test/exec.rs                   | 16 +++------
 datafusion/core/tests/custom_sources.rs            |  9 +++--
 datafusion/core/tests/provider_filter_pushdown.rs  |  3 +-
 datafusion/core/tests/statistics.rs                |  3 +-
 datafusion/core/tests/user_defined_plan.rs         |  4 +--
 45 files changed, 160 insertions(+), 259 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index ed3c9fceb..0b20acb47 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -41,7 +41,6 @@ use datafusion::physical_plan::{
 
 use crate::serde::protobuf::execute_query_params::OptionalSessionId;
 use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
-use async_trait::async_trait;
 use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::execution::context::TaskContext;
@@ -122,7 +121,6 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
     }
 }
 
-#[async_trait]
 impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
     fn as_any(&self) -> &dyn Any {
         self
@@ -162,7 +160,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
         }))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 27252b980..3046a2276 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
 use crate::client::BallistaClient;
 use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 
-use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 
 use datafusion::error::{DataFusionError, Result};
@@ -64,7 +63,6 @@ impl ShuffleReaderExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ShuffleReaderExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -101,7 +99,7 @@ impl ExecutionPlan for ShuffleReaderExec {
         ))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index f5c98b200..45a102185 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -33,7 +33,6 @@ use crate::utils;
 
 use crate::serde::protobuf::ShuffleWritePartition;
 use crate::serde::scheduler::PartitionStats;
-use async_trait::async_trait;
 use datafusion::arrow::array::{
     ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder,
 };
@@ -155,7 +154,7 @@ impl ShuffleWriterExec {
 
         async move {
             let now = Instant::now();
-            let mut stream = plan.execute(input_partition, context).await?;
+            let mut stream = plan.execute(input_partition, context)?;
 
             match output_partitioning {
                 None => {
@@ -293,7 +292,6 @@ impl ShuffleWriterExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ShuffleWriterExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -336,7 +334,7 @@ impl ExecutionPlan for ShuffleWriterExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -459,7 +457,7 @@ mod tests {
             work_dir.into_path().to_str().unwrap().to_owned(),
             Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
         )?;
-        let mut stream = query_stage.execute(0, task_ctx).await?;
+        let mut stream = query_stage.execute(0, task_ctx)?;
         let batches = utils::collect_stream(&mut stream)
             .await
             .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -516,7 +514,7 @@ mod tests {
             work_dir.into_path().to_str().unwrap().to_owned(),
             Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
         )?;
-        let mut stream = query_stage.execute(0, task_ctx).await?;
+        let mut stream = query_stage.execute(0, task_ctx)?;
         let batches = utils::collect_stream(&mut stream)
             .await
             .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index e1eecdd92..15d403fb6 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -18,7 +18,6 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
@@ -63,7 +62,6 @@ impl UnresolvedShuffleExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for UnresolvedShuffleExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -101,7 +99,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
         ))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index ed41ce61c..bc2d7ff6b 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -477,7 +477,6 @@ mod tests {
         }
     }
 
-    #[async_trait]
     impl ExecutionPlan for TopKExec {
         /// Return a reference to Any that can be used for downcasting
         fn as_any(&self) -> &dyn Any {
@@ -515,7 +514,7 @@ mod tests {
         }
 
         /// Execute one partition and return an iterator over RecordBatch
-        async fn execute(
+        fn execute(
             &self,
             _partition: usize,
             _context: Arc<TaskContext>,
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index 1bb4acaf8..54e97550a 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -22,7 +22,6 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::{any::Any, pin::Pin};
 
-use async_trait::async_trait;
 use datafusion::arrow::{
     datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
 };
@@ -49,7 +48,6 @@ impl CollectExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CollectExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -78,7 +76,7 @@ impl ExecutionPlan for CollectExec {
         unimplemented!()
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -86,10 +84,8 @@ impl ExecutionPlan for CollectExec {
         assert_eq!(0, partition);
         let num_partitions = self.plan.output_partitioning().partition_count();
 
-        let futures = (0..num_partitions).map(|i| self.plan.execute(i, context.clone()));
-        let streams = futures::future::join_all(futures)
-            .await
-            .into_iter()
+        let streams = (0..num_partitions)
+            .map(|i| self.plan.execute(i, context.clone()))
             .collect::<Result<Vec<_>>>()
             .map_err(|e| DataFusionError::Execution(format!("BallistaError: {:?}", e)))?;
 
diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index a4b9fda1a..d8a908986 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -196,7 +196,6 @@ impl CustomExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CustomExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -225,7 +224,7 @@ impl ExecutionPlan for CustomExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
@@ -243,7 +242,7 @@ impl ExecutionPlan for CustomExec {
             account_array.append_value(user.bank_account)?;
         }
 
-        return Ok(Box::pin(MemoryStream::try_new(
+        Ok(Box::pin(MemoryStream::try_new(
             vec![RecordBatch::try_new(
                 self.projected_schema.clone(),
                 vec![
@@ -253,7 +252,7 @@ impl ExecutionPlan for CustomExec {
             )?],
             self.schema(),
             None,
-        )?));
+        )?))
     }
 
     fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 87bc6be1a..cbc2adca1 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -161,7 +161,7 @@ mod tests {
         let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
         let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
         let task_ctx = ctx.task_ctx();
-        let stream = exec.execute(0, task_ctx).await?;
+        let stream = exec.execute(0, task_ctx)?;
 
         let tt_batches: i32 = stream
             .map(|batch| {
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index c56b84353..cd4fd5810 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -131,7 +131,7 @@ mod tests {
         let projection = None;
         let exec = get_exec(&projection, None).await?;
         let task_ctx = ctx.task_ctx();
-        let stream = exec.execute(0, task_ctx).await?;
+        let stream = exec.execute(0, task_ctx)?;
 
         let tt_batches: i32 = stream
             .map(|batch| {
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index daadff97d..a9a0c788b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -512,7 +512,7 @@ mod tests {
         let projection = None;
         let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
         let task_ctx = ctx.task_ctx();
-        let stream = exec.execute(0, task_ctx).await?;
+        let stream = exec.execute(0, task_ctx)?;
 
         let tt_batches = stream
             .map(|batch| {
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 90e429b18..72630b39c 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -76,7 +76,7 @@ impl MemTable {
                 let context1 = context.clone();
                 let exec = exec.clone();
                 tokio::spawn(async move {
-                    let stream = exec.execute(part_i, context1.clone()).await?;
+                    let stream = exec.execute(part_i, context1.clone())?;
                     common::collect(stream).await
                 })
             })
@@ -103,7 +103,7 @@ impl MemTable {
             let mut output_partitions = vec![];
             for i in 0..exec.output_partitioning().partition_count() {
                 // execute this *output* partition and collect all batches
-                let mut stream = exec.execute(i, context.clone()).await?;
+                let mut stream = exec.execute(i, context.clone())?;
                 let mut batches = vec![];
                 while let Some(result) = stream.next().await {
                     batches.push(result?);
@@ -177,7 +177,7 @@ mod tests {
 
         // scan with projection
         let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?;
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch2 = it.next().await.unwrap()?;
         assert_eq!(2, batch2.schema().fields().len());
         assert_eq!("c", batch2.schema().field(0).name());
@@ -209,7 +209,7 @@ mod tests {
         let provider = MemTable::try_new(schema, vec![vec![batch]])?;
 
         let exec = provider.scan(&None, &[], None).await?;
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
         assert_eq!(3, batch1.num_columns());
@@ -365,7 +365,7 @@ mod tests {
             MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
 
         let exec = provider.scan(&None, &[], None).await?;
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(3, batch1.schema().fields().len());
         assert_eq!(3, batch1.num_columns());
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 9c548ab9b..4cf96d235 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -304,7 +304,7 @@ mod tests {
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
-        let result = common::collect(optimized.execute(0, task_ctx).await?).await?;
+        let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5e1da793c..3682ec6eb 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -29,7 +29,6 @@ use crate::physical_plan::{
 };
 use arrow::array::ArrayRef;
 use arrow::datatypes::{Field, Schema, SchemaRef};
-use async_trait::async_trait;
 use datafusion_common::Result;
 use datafusion_expr::Accumulator;
 use datafusion_physical_expr::expressions::Column;
@@ -145,7 +144,6 @@ impl AggregateExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for AggregateExec {
     /// Return a reference to Any that can be used for down-casting
     fn as_any(&self) -> &dyn Any {
@@ -196,12 +194,12 @@ impl ExecutionPlan for AggregateExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let input = self.input.execute(partition, context).await?;
+        let input = self.input.execute(partition, context)?;
         let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
 
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -417,7 +415,6 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use arrow::error::Result as ArrowResult;
     use arrow::record_batch::RecordBatch;
-    use async_trait::async_trait;
     use datafusion_common::{DataFusionError, Result};
     use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
     use futures::{FutureExt, Stream};
@@ -489,8 +486,7 @@ mod tests {
         )?);
 
         let result =
-            common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?)
-                .await?;
+            common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?;
 
         let expected = vec![
             "+---+---------------+-------------+",
@@ -522,7 +518,7 @@ mod tests {
         )?);
 
         let result =
-            common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?;
+            common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
         assert_eq!(result.len(), 1);
 
         let batch = &result[0];
@@ -556,7 +552,6 @@ mod tests {
         pub yield_first: bool,
     }
 
-    #[async_trait]
     impl ExecutionPlan for TestYieldingExec {
         fn as_any(&self) -> &dyn Any {
             self
@@ -587,7 +582,7 @@ mod tests {
             )))
         }
 
-        async fn execute(
+        fn execute(
             &self,
             _partition: usize,
             _context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index f8050f16c..c2f08c69a 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -33,7 +33,6 @@ use futures::StreamExt;
 use super::expressions::PhysicalSortExpr;
 use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 
 /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
 /// discards the results, and then prints out an annotated plan with metrics
@@ -58,7 +57,6 @@ impl AnalyzeExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for AnalyzeExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -102,7 +100,7 @@ impl ExecutionPlan for AnalyzeExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -126,7 +124,7 @@ impl ExecutionPlan for AnalyzeExec {
         let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
 
         let captured_input = self.input.clone();
-        let mut input_stream = captured_input.execute(0, context).await?;
+        let mut input_stream = captured_input.execute(0, context)?;
         let captured_schema = self.schema.clone();
         let verbose = self.verbose;
 
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 482b0ee77..75ecaf53e 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -34,7 +34,6 @@ use arrow::compute::kernels::concat::concat;
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
 use futures::stream::{Stream, StreamExt};
 use log::debug;
 
@@ -75,7 +74,6 @@ impl CoalesceBatchesExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CoalesceBatchesExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -116,13 +114,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
-            input: self.input.execute(partition, context).await?,
+            input: self.input.execute(partition, context)?,
             schema: self.input.schema(),
             target_batch_size: self.target_batch_size,
             buffer: Vec::new(),
@@ -348,7 +346,7 @@ mod tests {
         for i in 0..output_partition_count {
             // execute this *output* partition and collect all batches
             let task_ctx = session_ctx.task_ctx();
-            let mut stream = exec.execute(i, task_ctx.clone()).await?;
+            let mut stream = exec.execute(i, task_ctx.clone())?;
             let mut batches = vec![];
             while let Some(result) = stream.next().await {
                 batches.push(result?);
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 35f75db03..11fcd5d50 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -25,8 +25,6 @@ use std::task::Poll;
 use futures::Stream;
 use tokio::sync::mpsc;
 
-use async_trait::async_trait;
-
 use arrow::record_batch::RecordBatch;
 use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
 
@@ -66,7 +64,6 @@ impl CoalescePartitionsExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CoalescePartitionsExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -101,7 +98,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
         Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone())))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -121,7 +118,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
             )),
             1 => {
                 // bypass any threading / metrics if there is a single partition
-                self.input.execute(0, context).await
+                self.input.execute(0, context)
             }
             _ => {
                 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -252,7 +249,7 @@ mod tests {
         assert_eq!(merge.output_partitioning().partition_count(), 1);
 
         // the result should contain 4 batches (one per input partition)
-        let iter = merge.execute(0, task_ctx).await?;
+        let iter = merge.execute(0, task_ctx)?;
         let batches = common::collect(iter).await?;
         assert_eq!(batches.len(), num_partitions);
 
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 68bd676dd..24df647dc 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -179,7 +179,7 @@ pub(crate) fn spawn_execution(
     context: Arc<TaskContext>,
 ) -> JoinHandle<()> {
     tokio::spawn(async move {
-        let mut stream = match input.execute(partition, context).await {
+        let mut stream = match input.execute(partition, context) {
             Err(e) => {
                 // If send fails, plan being torn
                 // down, no place to send the error
diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/cross_join.rs
index 2846af9c5..e3f25fc56 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/cross_join.rs
@@ -111,7 +111,7 @@ async fn load_left_input(
 
     // merge all left parts into a single stream
     let merge = CoalescePartitionsExec::new(left.clone());
-    let stream = merge.execute(0, context).await?;
+    let stream = merge.execute(0, context)?;
 
     // Load all batches and count the rows
     let (batches, num_rows) = stream
@@ -133,7 +133,6 @@ async fn load_left_input(
     Ok(merged_batch)
 }
 
-#[async_trait]
 impl ExecutionPlan for CrossJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -169,12 +168,12 @@ impl ExecutionPlan for CrossJoinExec {
         false
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let stream = self.right.execute(partition, context.clone()).await?;
+        let stream = self.right.execute(partition, context.clone())?;
 
         let left_fut = self
             .left_fut
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 43e749e1d..bba87e190 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -33,7 +33,6 @@ use super::expressions::PhysicalSortExpr;
 use super::{common, SendableRecordBatchStream, Statistics};
 
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 
 /// Execution plan for empty relation (produces no rows)
 #[derive(Debug)]
@@ -76,7 +75,6 @@ impl EmptyExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for EmptyExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -114,7 +112,7 @@ impl ExecutionPlan for EmptyExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -172,7 +170,7 @@ mod tests {
         assert_eq!(empty.schema(), schema);
 
         // we should have no results
-        let iter = empty.execute(0, task_ctx).await?;
+        let iter = empty.execute(0, task_ctx)?;
         let batches = common::collect(iter).await?;
         assert!(batches.is_empty());
 
@@ -208,8 +206,8 @@ mod tests {
         let empty = EmptyExec::new(false, schema);
 
         // ask for the wrong partition
-        assert!(empty.execute(1, task_ctx.clone()).await.is_err());
-        assert!(empty.execute(20, task_ctx).await.is_err());
+        assert!(empty.execute(1, task_ctx.clone()).is_err());
+        assert!(empty.execute(20, task_ctx).is_err());
         Ok(())
     }
 
@@ -220,7 +218,7 @@ mod tests {
         let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(true, schema);
 
-        let iter = empty.execute(0, task_ctx).await?;
+        let iter = empty.execute(0, task_ctx)?;
         let batches = common::collect(iter).await?;
 
         // should have one item
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index fb6631588..fdc139a7e 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -34,7 +34,6 @@ use log::debug;
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
-use async_trait::async_trait;
 
 /// Explain execution plan operator. This operator contains the string
 /// values of the various plans it has when it is created, and passes
@@ -74,7 +73,6 @@ impl ExplainExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ExplainExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -110,7 +108,7 @@ impl ExecutionPlan for ExplainExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 68f8f2f90..eed0161a2 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,6 @@ use arrow::datatypes::SchemaRef;
 use arrow::error::ArrowError;
 
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -61,7 +60,6 @@ impl AvroExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for AvroExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -95,7 +93,7 @@ impl ExecutionPlan for AvroExec {
     }
 
     #[cfg(not(feature = "avro"))]
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
@@ -106,7 +104,7 @@ impl ExecutionPlan for AvroExec {
     }
 
     #[cfg(feature = "avro")]
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8aea607ea..96bceb225 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -26,7 +26,6 @@ use crate::physical_plan::{
 
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
 use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
 use std::fs;
@@ -75,7 +74,6 @@ impl CsvExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CsvExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -112,7 +110,7 @@ impl ExecutionPlan for CsvExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -191,7 +189,7 @@ pub async fn plan_to_csv(
                 let file = fs::File::create(path)?;
                 let mut writer = csv::Writer::new(file);
                 let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx).await?;
+                let stream = plan.execute(i, task_ctx)?;
                 let handle: JoinHandle<Result<()>> = task::spawn(async move {
                     stream
                         .map(|batch| writer.write(&batch?))
@@ -250,7 +248,7 @@ mod tests {
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
 
-        let mut stream = csv.execute(0, task_ctx).await?;
+        let mut stream = csv.execute(0, task_ctx)?;
         let batch = stream.next().await.unwrap()?;
         assert_eq!(3, batch.num_columns());
         assert_eq!(100, batch.num_rows());
@@ -297,7 +295,7 @@ mod tests {
         assert_eq!(13, csv.projected_schema.fields().len());
         assert_eq!(13, csv.schema().fields().len());
 
-        let mut it = csv.execute(0, task_ctx).await?;
+        let mut it = csv.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
         assert_eq!(13, batch.num_columns());
         assert_eq!(5, batch.num_rows());
@@ -344,7 +342,7 @@ mod tests {
         assert_eq!(14, csv.projected_schema.fields().len());
         assert_eq!(14, csv.schema().fields().len());
 
-        let mut it = csv.execute(0, task_ctx).await?;
+        let mut it = csv.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
         assert_eq!(14, batch.num_columns());
         assert_eq!(5, batch.num_rows());
@@ -398,7 +396,7 @@ mod tests {
         assert_eq!(2, csv.projected_schema.fields().len());
         assert_eq!(2, csv.schema().fields().len());
 
-        let mut it = csv.execute(0, task_ctx).await?;
+        let mut it = csv.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
         assert_eq!(2, batch.num_columns());
         assert_eq!(100, batch.num_rows());
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 5c02a9c92..818496d13 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -17,7 +17,6 @@
 
 //! Execution plan for reading line-delimited JSON files
 use arrow::json::reader::DecoderOptions;
-use async_trait::async_trait;
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
@@ -58,7 +57,6 @@ impl NdJsonExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for NdJsonExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -91,7 +89,7 @@ impl ExecutionPlan for NdJsonExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -168,7 +166,7 @@ pub async fn plan_to_json(
                 let file = fs::File::create(path)?;
                 let mut writer = json::LineDelimitedWriter::new(file);
                 let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx).await?;
+                let stream = plan.execute(i, task_ctx)?;
                 let handle: JoinHandle<Result<()>> = task::spawn(async move {
                     stream
                         .map(|batch| writer.write(batch?))
@@ -255,7 +253,7 @@ mod tests {
             &DataType::Utf8
         );
 
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
 
         assert_eq!(batch.num_rows(), 3);
@@ -296,7 +294,7 @@ mod tests {
             table_partition_cols: vec![],
         });
 
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
 
         assert_eq!(batch.num_rows(), 3);
@@ -335,7 +333,7 @@ mod tests {
         inferred_schema.field_with_name("c").unwrap();
         inferred_schema.field_with_name("d").unwrap_err();
 
-        let mut it = exec.execute(0, task_ctx).await?;
+        let mut it = exec.execute(0, task_ctx)?;
         let batch = it.next().await.unwrap()?;
 
         assert_eq!(batch.num_rows(), 4);
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index df7327aa0..d2e156f32 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -33,7 +33,6 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use async_trait::async_trait;
 use futures::{Stream, StreamExt, TryStreamExt};
 use log::debug;
 use parquet::arrow::{
@@ -165,7 +164,6 @@ impl ParquetFileMetrics {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ParquetExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -201,7 +199,7 @@ impl ExecutionPlan for ParquetExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition_index: usize,
         context: Arc<TaskContext>,
@@ -592,7 +590,7 @@ pub async fn plan_to_parquet(
                     writer_properties.clone(),
                 )?;
                 let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx).await?;
+                let stream = plan.execute(i, task_ctx)?;
                 let handle: tokio::task::JoinHandle<Result<()>> =
                     tokio::task::spawn(async move {
                         stream
@@ -1059,7 +1057,7 @@ mod tests {
         );
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
-        let mut results = parquet_exec.execute(0, task_ctx).await?;
+        let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap()?;
 
         assert_eq!(8, batch.num_rows());
@@ -1111,7 +1109,7 @@ mod tests {
                 None,
             );
             assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
-            let results = parquet_exec.execute(0, task_ctx).await?.next().await;
+            let results = parquet_exec.execute(0, task_ctx)?.next().await;
 
             if let Some(expected_row_num) = expected_row_num {
                 let batch = results.unwrap()?;
@@ -1190,7 +1188,7 @@ mod tests {
         );
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
-        let mut results = parquet_exec.execute(0, task_ctx).await?;
+        let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap()?;
         let expected = vec![
             "+----+----------+-------------+-------+",
@@ -1247,7 +1245,7 @@ mod tests {
             None,
         );
 
-        let mut results = parquet_exec.execute(0, task_ctx).await?;
+        let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap();
         // invalid file should produce an error to that effect
         assert_contains!(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 158bedf28..4aa6453bf 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -36,7 +36,6 @@ use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
-use async_trait::async_trait;
 use log::debug;
 
 use crate::execution::context::TaskContext;
@@ -84,7 +83,6 @@ impl FilterExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for FilterExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -129,7 +127,7 @@ impl ExecutionPlan for FilterExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -137,9 +135,9 @@ impl ExecutionPlan for FilterExec {
         debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
+            schema: self.input.schema(),
             predicate: self.predicate.clone(),
-            input: self.input.execute(partition, context).await?,
+            input: self.input.execute(partition, context)?,
             baseline_metrics,
         }))
     }
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index 31882c63c..8a4a342c1 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -34,7 +34,6 @@ use std::sync::Arc;
 use std::{any::Any, usize};
 use std::{time::Instant, vec};
 
-use async_trait::async_trait;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
 use arrow::array::{new_null_array, Array};
@@ -250,7 +249,6 @@ impl HashJoinExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for HashJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -290,7 +288,7 @@ impl ExecutionPlan for HashJoinExec {
         false
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -318,7 +316,7 @@ impl ExecutionPlan for HashJoinExec {
 
         // we have the batches and the hash map with their keys. We can how create a stream
         // over the right that uses this information to issue new batches.
-        let right_stream = self.right.execute(partition, context).await?;
+        let right_stream = self.right.execute(partition, context)?;
 
         Ok(Box::pin(HashJoinStream {
             schema: self.schema(),
@@ -375,7 +373,7 @@ async fn collect_left_input(
 
     // merge all left parts into a single stream
     let merge = CoalescePartitionsExec::new(left);
-    let stream = merge.execute(0, context).await?;
+    let stream = merge.execute(0, context)?;
 
     // This operation performs 2 steps at once:
     // 1. creates a [JoinHashMap] of all batches from the stream
@@ -430,7 +428,7 @@ async fn partitioned_left_input(
     let start = Instant::now();
 
     // Load 1 partition of left side in memory
-    let stream = left.execute(partition, context.clone()).await?;
+    let stream = left.execute(partition, context.clone())?;
 
     // This operation performs 2 steps at once:
     // 1. creates a [JoinHashMap] of all batches from the stream
@@ -1122,7 +1120,7 @@ mod tests {
         let join = join(left, right, on, join_type, null_equals_null)?;
         let columns = columns(&join.schema());
 
-        let stream = join.execute(0, context).await?;
+        let stream = join.execute(0, context)?;
         let batches = common::collect(stream).await?;
 
         Ok((columns, batches))
@@ -1167,7 +1165,7 @@ mod tests {
 
         let mut batches = vec![];
         for i in 0..partition_count {
-            let stream = join.execute(i, context.clone()).await?;
+            let stream = join.execute(i, context.clone())?;
             let more_batches = common::collect(stream).await?;
             batches.extend(
                 more_batches
@@ -1446,7 +1444,7 @@ mod tests {
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
 
         // first part
-        let stream = join.execute(0, task_ctx.clone()).await?;
+        let stream = join.execute(0, task_ctx.clone())?;
         let batches = common::collect(stream).await?;
         assert_eq!(batches.len(), 1);
 
@@ -1460,7 +1458,7 @@ mod tests {
         assert_batches_sorted_eq!(expected, &batches);
 
         // second part
-        let stream = join.execute(1, task_ctx.clone()).await?;
+        let stream = join.execute(1, task_ctx.clone())?;
         let batches = common::collect(stream).await?;
         assert_eq!(batches.len(), 1);
         let expected = vec![
@@ -1513,7 +1511,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
 
-        let stream = join.execute(0, task_ctx).await.unwrap();
+        let stream = join.execute(0, task_ctx).unwrap();
         let batches = common::collect(stream).await.unwrap();
 
         let expected = vec![
@@ -1556,7 +1554,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
 
-        let stream = join.execute(0, task_ctx).await.unwrap();
+        let stream = join.execute(0, task_ctx).unwrap();
         let batches = common::collect(stream).await.unwrap();
 
         let expected = vec![
@@ -1597,7 +1595,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
 
-        let stream = join.execute(0, task_ctx).await.unwrap();
+        let stream = join.execute(0, task_ctx).unwrap();
         let batches = common::collect(stream).await.unwrap();
 
         let expected = vec![
@@ -1634,7 +1632,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
 
-        let stream = join.execute(0, task_ctx).await.unwrap();
+        let stream = join.execute(0, task_ctx).unwrap();
         let batches = common::collect(stream).await.unwrap();
 
         let expected = vec![
@@ -1762,7 +1760,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1"]);
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
 
         let expected = vec![
@@ -1803,7 +1801,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1"]);
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
 
         let expected = vec![
@@ -1921,7 +1919,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
 
         let expected = vec![
@@ -2014,7 +2012,7 @@ mod tests {
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
 
         let expected = vec![
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 71e08afd2..00516db0b 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -43,7 +43,6 @@ use super::{
 };
 
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 
 /// Limit execution plan
 #[derive(Debug)]
@@ -77,7 +76,6 @@ impl GlobalLimitExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for GlobalLimitExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -127,7 +125,7 @@ impl ExecutionPlan for GlobalLimitExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -152,7 +150,7 @@ impl ExecutionPlan for GlobalLimitExec {
         }
 
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-        let stream = self.input.execute(0, context).await?;
+        let stream = self.input.execute(0, context)?;
         Ok(Box::pin(LimitStream::new(
             stream,
             self.limit,
@@ -230,7 +228,6 @@ impl LocalLimitExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for LocalLimitExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -282,14 +279,14 @@ impl ExecutionPlan for LocalLimitExec {
         }
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-        let stream = self.input.execute(partition, context).await?;
+        let stream = self.input.execute(partition, context)?;
         Ok(Box::pin(LimitStream::new(
             stream,
             self.limit,
@@ -467,7 +464,7 @@ mod tests {
             GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
 
         // the result should contain 4 batches (one per input partition)
-        let iter = limit.execute(0, task_ctx).await?;
+        let iter = limit.execute(0, task_ctx)?;
         let batches = common::collect(iter).await?;
 
         // there should be a total of 100 rows
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 2862aefdb..6a0af7650 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -33,7 +33,6 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 use datafusion_common::DataFusionError;
 use futures::Stream;
 
@@ -57,7 +56,6 @@ impl fmt::Debug for MemoryExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for MemoryExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -97,7 +95,7 @@ impl ExecutionPlan for MemoryExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
@@ -279,7 +277,7 @@ mod tests {
         );
 
         // scan with projection
-        let mut it = executor.execute(0, task_ctx).await?;
+        let mut it = executor.execute(0, task_ctx)?;
         let batch2 = it.next().await.unwrap()?;
         assert_eq!(2, batch2.schema().fields().len());
         assert_eq!("c", batch2.schema().field(0).name());
@@ -329,7 +327,7 @@ mod tests {
             ])
         );
 
-        let mut it = executor.execute(0, task_ctx).await?;
+        let mut it = executor.execute(0, task_ctx)?;
         let batch1 = it.next().await.unwrap()?;
         assert_eq!(4, batch1.schema().fields().len());
         assert_eq!(4, batch1.num_columns());
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index dc963c7e1..feb0bf322 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -29,7 +29,6 @@ use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
-use async_trait::async_trait;
 pub use datafusion_expr::Accumulator;
 pub use datafusion_expr::ColumnarValue;
 pub use display::DisplayFormatType;
@@ -128,7 +127,6 @@ pub struct ColumnStatistics {
 /// [`ExecutionPlan`] can be displayed in an simplified form using the
 /// return value from [`displayable`] in addition to the (normally
 /// quite verbose) `Debug` output.
-#[async_trait]
 pub trait ExecutionPlan: Debug + Send + Sync {
     /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
     /// downcast to a specific implementation.
@@ -223,7 +221,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
     /// creates an iterator
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -426,13 +424,13 @@ pub async fn execute_stream(
 ) -> Result<SendableRecordBatchStream> {
     match plan.output_partitioning().partition_count() {
         0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
-        1 => plan.execute(0, context).await,
+        1 => plan.execute(0, context),
         _ => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(plan.clone());
             // CoalescePartitionsExec must produce a single partition
             assert_eq!(1, plan.output_partitioning().partition_count());
-            plan.execute(0, context).await
+            plan.execute(0, context)
         }
     }
 }
@@ -458,7 +456,7 @@ pub async fn execute_stream_partitioned(
     let num_partitions = plan.output_partitioning().partition_count();
     let mut streams = Vec::with_capacity(num_partitions);
     for i in 0..num_partitions {
-        streams.push(plan.execute(i, context.clone()).await?);
+        streams.push(plan.execute(i, context.clone())?);
     }
     Ok(streams)
 }
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index b58029eb5..85fb7d424 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1510,7 +1510,6 @@ mod tests {
         logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream,
     };
     use arrow::datatypes::{DataType, Field, SchemaRef};
-    use async_trait::async_trait;
     use datafusion_common::{DFField, DFSchema, DFSchemaRef};
     use datafusion_expr::sum;
     use datafusion_expr::{col, lit};
@@ -2000,7 +1999,6 @@ mod tests {
         schema: SchemaRef,
     }
 
-    #[async_trait]
     impl ExecutionPlan for NoOpExecutionPlan {
         /// Return a reference to Any that can be used for downcasting
         fn as_any(&self) -> &dyn Any {
@@ -2034,7 +2032,7 @@ mod tests {
             unimplemented!("NoOpExecutionPlan::with_new_children");
         }
 
-        async fn execute(
+        fn execute(
             &self,
             _partition: usize,
             _context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 554dda12c..8e8a1ee54 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -39,7 +39,6 @@ use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use crate::execution::context::TaskContext;
-use async_trait::async_trait;
 use futures::stream::Stream;
 use futures::stream::StreamExt;
 
@@ -102,7 +101,6 @@ impl ProjectionExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ProjectionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -146,7 +144,7 @@ impl ExecutionPlan for ProjectionExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -155,7 +153,7 @@ impl ExecutionPlan for ProjectionExec {
         Ok(Box::pin(ProjectionStream {
             schema: self.schema.clone(),
             expr: self.expr.iter().map(|x| x.0.clone()).collect(),
-            input: self.input.execute(partition, context).await?,
+            input: self.input.execute(partition, context)?,
             baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
         }))
     }
@@ -345,7 +343,7 @@ mod tests {
         let mut row_count = 0;
         for partition in 0..projection.output_partitioning().partition_count() {
             partition_count += 1;
-            let stream = projection.execute(partition, task_ctx.clone()).await?;
+            let stream = projection.execute(partition, task_ctx.clone())?;
 
             row_count += stream
                 .map(|batch| {
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 379555396..82efe2c4f 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -37,17 +37,14 @@ use super::common::{AbortOnDropMany, AbortOnDropSingle};
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream};
-use async_trait::async_trait;
 
 use crate::execution::context::TaskContext;
 use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::Stream;
 use futures::StreamExt;
 use hashbrown::HashMap;
-use tokio::sync::{
-    mpsc::{self, UnboundedReceiver, UnboundedSender},
-    Mutex,
-};
+use parking_lot::Mutex;
+use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
 use tokio::task::JoinHandle;
 
 type MaybeBatch = Option<ArrowResult<RecordBatch>>;
@@ -261,7 +258,6 @@ impl RepartitionExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for RepartitionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -299,7 +295,7 @@ impl ExecutionPlan for RepartitionExec {
         None
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -309,7 +305,7 @@ impl ExecutionPlan for RepartitionExec {
             partition
         );
         // lock mutexes
-        let mut state = self.state.lock().await;
+        let mut state = self.state.lock();
 
         let num_input_partitions = self.input.output_partitioning().partition_count();
         let num_output_partitions = self.partitioning.partition_count();
@@ -437,7 +433,7 @@ impl RepartitionExec {
 
         // execute the child operator
         let timer = r_metrics.fetch_time.timer();
-        let mut stream = input.execute(i, context).await?;
+        let mut stream = input.execute(i, context)?;
         timer.done();
 
         // While there are still outputs to send to, keep
@@ -689,7 +685,7 @@ mod tests {
         let mut output_partitions = vec![];
         for i in 0..exec.partitioning.partition_count() {
             // execute this *output* partition and collect all batches
-            let mut stream = exec.execute(i, task_ctx.clone()).await?;
+            let mut stream = exec.execute(i, task_ctx.clone())?;
             let mut batches = vec![];
             while let Some(result) = stream.next().await {
                 batches.push(result?);
@@ -745,7 +741,7 @@ mod tests {
         // returned and no results produced
         let partitioning = Partitioning::UnknownPartitioning(1);
         let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
-        let output_stream = exec.execute(0, task_ctx).await.unwrap();
+        let output_stream = exec.execute(0, task_ctx).unwrap();
 
         // Expect that an error is returned
         let result_string = crate::physical_plan::common::collect(output_stream)
@@ -773,7 +769,7 @@ mod tests {
 
         // Note: this should pass (the stream can be created) but the
         // error when the input is executed should get passed back
-        let output_stream = exec.execute(0, task_ctx).await.unwrap();
+        let output_stream = exec.execute(0, task_ctx).unwrap();
 
         // Expect that an error is returned
         let result_string = crate::physical_plan::common::collect(output_stream)
@@ -808,7 +804,7 @@ mod tests {
 
         // Note: this should pass (the stream can be created) but the
         // error when the input is executed should get passed back
-        let output_stream = exec.execute(0, task_ctx).await.unwrap();
+        let output_stream = exec.execute(0, task_ctx).unwrap();
 
         // Expect that an error is returned
         let result_string = crate::physical_plan::common::collect(output_stream)
@@ -860,7 +856,7 @@ mod tests {
 
         assert_batches_sorted_eq!(&expected, &expected_batches);
 
-        let output_stream = exec.execute(0, task_ctx).await.unwrap();
+        let output_stream = exec.execute(0, task_ctx).unwrap();
         let batches = crate::physical_plan::common::collect(output_stream)
             .await
             .unwrap();
@@ -880,8 +876,8 @@ mod tests {
         // partition into two output streams
         let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap();
 
-        let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
-        let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+        let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
+        let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
 
         // now, purposely drop output stream 0
         // *before* any outputs are produced
@@ -927,7 +923,7 @@ mod tests {
         // We first collect the results without droping the output stream.
         let input = Arc::new(make_barrier_exec());
         let exec = RepartitionExec::try_new(input.clone(), partitioning.clone()).unwrap();
-        let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+        let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
         input.wait().await;
         let batches_without_drop = crate::physical_plan::common::collect(output_stream1)
             .await
@@ -947,8 +943,8 @@ mod tests {
         // Now do the same but dropping the stream before waiting for the barrier
         let input = Arc::new(make_barrier_exec());
         let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap();
-        let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
-        let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+        let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
+        let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
         // now, purposely drop output stream 0
         // *before* any outputs are produced
         std::mem::drop(output_stream0);
@@ -1053,11 +1049,11 @@ mod tests {
         let schema = batch.schema();
         let input = MockExec::new(vec![Ok(batch)], schema);
         let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
-        let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
+        let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
         let batch0 = crate::physical_plan::common::collect(output_stream0)
             .await
             .unwrap();
-        let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+        let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
         let batch1 = crate::physical_plan::common::collect(output_stream1)
             .await
             .unwrap();
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/sort_merge_join.rs
index 765f3ba25..c207917b6 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/sort_merge_join.rs
@@ -32,7 +32,6 @@ use arrow::compute::{take, SortOptions};
 use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
 use futures::{Stream, StreamExt};
 
 use crate::error::DataFusionError;
@@ -112,7 +111,6 @@ impl SortMergeJoinExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for SortMergeJoinExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -153,7 +151,7 @@ impl ExecutionPlan for SortMergeJoinExec {
         }
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -178,8 +176,8 @@ impl ExecutionPlan for SortMergeJoinExec {
         };
 
         // execute children plans
-        let streamed = streamed.execute(partition, context.clone()).await?;
-        let buffered = buffered.execute(partition, context.clone()).await?;
+        let streamed = streamed.execute(partition, context.clone())?;
+        let buffered = buffered.execute(partition, context.clone())?;
 
         // create output buffer
         let batch_size = context.session_config().batch_size;
@@ -1357,7 +1355,7 @@ mod tests {
         )?;
         let columns = columns(&join.schema());
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
         Ok((columns, batches))
     }
@@ -1374,7 +1372,7 @@ mod tests {
         let join = join(left, right, on, join_type)?;
         let columns = columns(&join.schema());
 
-        let stream = join.execute(0, task_ctx).await?;
+        let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
         Ok((columns, batches))
     }
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index d11a0cf29..3a0b5f2a1 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -694,7 +694,6 @@ impl SortExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for SortExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -748,7 +747,7 @@ impl ExecutionPlan for SortExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -775,7 +774,7 @@ impl ExecutionPlan for SortExec {
             partition
         );
 
-        let input = self.input.execute(partition, context.clone()).await?;
+        let input = self.input.execute(partition, context.clone())?;
 
         debug!("End SortExec's input.execute for partition: {}", partition);
 
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 37ed4acb8..8e3326255 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -35,7 +35,6 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use async_trait::async_trait;
 use futures::stream::{Fuse, FusedStream};
 use futures::{Stream, StreamExt};
 use tokio::sync::mpsc;
@@ -108,7 +107,6 @@ impl SortPreservingMergeExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for SortPreservingMergeExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -150,7 +148,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -182,7 +180,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
             )),
             1 => {
                 // bypass if there is only one partition to merge (no metrics in this case either)
-                let result = self.input.execute(0, context).await;
+                let result = self.input.execute(0, context);
                 debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input");
                 result
             }
@@ -210,20 +208,13 @@ impl ExecutionPlan for SortPreservingMergeExec {
                             )
                         })
                         .collect(),
-                    Err(_) => {
-                        futures::future::try_join_all((0..input_partitions).map(
-                            |partition| {
-                                let context = context.clone();
-                                async move {
-                                    self.input
-                                        .execute(partition, context)
-                                        .await
-                                        .map(|stream| SortedStream::new(stream, 0))
-                                }
-                            },
-                        ))
-                        .await?
-                    }
+                    Err(_) => (0..input_partitions)
+                        .map(|partition| {
+                            let stream =
+                                self.input.execute(partition, context.clone())?;
+                            Ok(SortedStream::new(stream, 0))
+                        })
+                        .collect::<Result<_>>()?,
                 };
 
                 debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");
@@ -1209,7 +1200,7 @@ mod tests {
 
         for partition in 0..partition_count {
             let (sender, receiver) = mpsc::channel(1);
-            let mut stream = batches.execute(partition, task_ctx.clone()).await.unwrap();
+            let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
             let join_handle = tokio::spawn(async move {
                 while let Some(batch) = stream.next().await {
                     sender.send(batch).await.unwrap();
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index b794cad26..e89ac4a76 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -38,7 +38,6 @@ use crate::{
     error::Result,
     physical_plan::{expressions, metrics::BaselineMetrics},
 };
-use async_trait::async_trait;
 
 /// UNION ALL execution plan
 #[derive(Debug)]
@@ -64,7 +63,6 @@ impl UnionExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for UnionExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -107,7 +105,7 @@ impl ExecutionPlan for UnionExec {
         Ok(Arc::new(UnionExec::new(children)))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         mut partition: usize,
         context: Arc<TaskContext>,
@@ -123,7 +121,7 @@ impl ExecutionPlan for UnionExec {
         for input in self.inputs.iter() {
             // Calculate whether partition belongs to the current partition
             if partition < input.output_partitioning().partition_count() {
-                let stream = input.execute(partition, context.clone()).await?;
+                let stream = input.execute(partition, context)?;
                 debug!("Found a Union partition to execute");
                 return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
             } else {
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index f2ba681ed..897936814 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -29,7 +29,6 @@ use crate::scalar::ScalarValue;
 use arrow::array::new_null_array;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -96,7 +95,6 @@ impl ValuesExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ValuesExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -138,7 +136,7 @@ impl ExecutionPlan for ValuesExec {
         }))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 8b545a12b..e9eac35a3 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -33,7 +33,6 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use async_trait::async_trait;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use std::any::Any;
@@ -90,7 +89,6 @@ impl WindowAggExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for WindowAggExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -148,12 +146,12 @@ impl ExecutionPlan for WindowAggExec {
         )?))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let input = self.input.execute(partition, context).await?;
+        let input = self.input.execute(partition, context)?;
         let stream = Box::pin(WindowAggStream::new(
             self.schema.clone(),
             self.window_expr.clone(),
diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs
index a765ddf83..4cdc34b72 100644
--- a/datafusion/core/src/scheduler/mod.rs
+++ b/datafusion/core/src/scheduler/mod.rs
@@ -76,7 +76,6 @@
 
 use std::sync::Arc;
 
-use futures::stream::BoxStream;
 use log::{debug, error};
 
 use crate::error::Result;
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
index baf487d98..20e7c6e79 100644
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -22,9 +22,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll, Waker};
 
-use arrow::error::ArrowError;
-use async_trait::async_trait;
-use futures::{Stream, StreamExt, TryStreamExt};
+use futures::{Stream, StreamExt};
 use parking_lot::Mutex;
 
 use crate::arrow::datatypes::SchemaRef;
@@ -39,7 +37,6 @@ use crate::physical_plan::{
 };
 
 use crate::scheduler::pipeline::Pipeline;
-use crate::scheduler::BoxStream;
 
 /// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
 /// converts it to the push-based [`Pipeline`] interface
@@ -57,7 +54,7 @@ use crate::scheduler::BoxStream;
 pub struct ExecutionPipeline {
     proxied: Arc<dyn ExecutionPlan>,
     inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
-    outputs: Vec<Mutex<BoxStream<'static, ArrowResult<RecordBatch>>>>,
+    outputs: Vec<Mutex<SendableRecordBatchStream>>,
 }
 
 impl std::fmt::Debug for ExecutionPipeline {
@@ -125,23 +122,8 @@ impl ExecutionPipeline {
         // Construct the output streams
         let output_count = proxied.output_partitioning().partition_count();
         let outputs = (0..output_count)
-            .map(|x| {
-                let proxy_captured = proxied.clone();
-                let task_captured = task_context.clone();
-                let fut = async move {
-                    proxy_captured
-                        .execute(x, task_captured)
-                        .await
-                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                };
-
-                // Use futures::stream::once to handle operators that perform computation
-                // within `ExecutionPlan::execute`. If we evaluated these futures here
-                // we could potentially block indefinitely waiting for inputs that will
-                // never arrive as the query isn't scheduled yet
-                Mutex::new(futures::stream::once(fut).try_flatten().boxed())
-            })
-            .collect();
+            .map(|x| proxied.execute(x, task_context.clone()).map(Mutex::new))
+            .collect::<Result<_>>()?;
 
         Ok(Self {
             proxied,
@@ -236,7 +218,6 @@ struct ProxyExecutionPlan {
     inputs: Vec<Arc<Mutex<InputPartition>>>,
 }
 
-#[async_trait]
 impl ExecutionPlan for ProxyExecutionPlan {
     fn as_any(&self) -> &dyn Any {
         self
@@ -281,7 +262,7 @@ impl ExecutionPlan for ProxyExecutionPlan {
         unimplemented!()
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 7e0cbe35f..855bb3bbc 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -17,7 +17,6 @@
 
 //! Simple iterator over batches for use in testing
 
-use async_trait::async_trait;
 use std::{
     any::Any,
     pin::Pin,
@@ -138,7 +137,6 @@ impl MockExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for MockExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -168,7 +166,7 @@ impl ExecutionPlan for MockExec {
     }
 
     /// Returns a stream which yields data
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
@@ -277,7 +275,6 @@ impl BarrierExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for BarrierExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -307,7 +304,7 @@ impl ExecutionPlan for BarrierExec {
     }
 
     /// Returns a stream which yields data
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
@@ -378,7 +375,6 @@ impl ErrorExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for ErrorExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -408,7 +404,7 @@ impl ExecutionPlan for ErrorExec {
     }
 
     /// Returns a stream which yields data
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
@@ -458,7 +454,6 @@ impl StatisticsExec {
         }
     }
 }
-#[async_trait]
 impl ExecutionPlan for StatisticsExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -487,7 +482,7 @@ impl ExecutionPlan for StatisticsExec {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
@@ -552,7 +547,6 @@ impl BlockingExec {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for BlockingExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -585,7 +579,7 @@ impl ExecutionPlan for BlockingExec {
         )))
     }
 
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index dbaaca206..81e4706de 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -98,31 +98,36 @@ impl Stream for TestCustomRecordBatchStream {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for CustomExecutionPlan {
     fn as_any(&self) -> &dyn Any {
         self
     }
+
     fn schema(&self) -> SchemaRef {
         let schema = TEST_CUSTOM_SCHEMA_REF!();
         project_schema(&schema, self.projection.as_ref()).expect("projected schema")
     }
+
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
     }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![]
     }
+
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(self)
     }
-    async fn execute(
+
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index 664e77e18..49cd70143 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -55,7 +55,6 @@ struct CustomPlan {
     batches: Vec<Arc<RecordBatch>>,
 }
 
-#[async_trait]
 impl ExecutionPlan for CustomPlan {
     fn as_any(&self) -> &dyn std::any::Any {
         self
@@ -84,7 +83,7 @@ impl ExecutionPlan for CustomPlan {
         unreachable!()
     }
 
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index d57c218d4..031506704 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -106,7 +106,6 @@ impl TableProvider for StatisticsValidation {
     }
 }
 
-#[async_trait]
 impl ExecutionPlan for StatisticsValidation {
     fn as_any(&self) -> &dyn Any {
         self
@@ -135,7 +134,7 @@ impl ExecutionPlan for StatisticsValidation {
         Ok(self)
     }
 
-    async fn execute(
+    fn execute(
         &self,
         _partition: usize,
         _context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 43e6eeacd..d062cf3a3 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -457,7 +457,7 @@ impl ExecutionPlan for TopKExec {
     }
 
     /// Execute one partition and return an iterator over RecordBatch
-    async fn execute(
+    fn execute(
         &self,
         partition: usize,
         context: Arc<TaskContext>,
@@ -470,7 +470,7 @@ impl ExecutionPlan for TopKExec {
         }
 
         Ok(Box::pin(TopKReader {
-            input: self.input.execute(partition, context).await?,
+            input: self.input.execute(partition, context)?,
             k: self.k,
             done: false,
             state: BTreeMap::new(),