You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/04 16:09:09 UTC
[arrow-datafusion] branch master updated: Make ExecutionPlan sync (#2307) (#2434)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 807b7a5f7 Make ExecutionPlan sync (#2307) (#2434)
807b7a5f7 is described below
commit 807b7a5f7eb858e9f7162e1f00ffeeedd0bf2050
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 4 17:09:03 2022 +0100
Make ExecutionPlan sync (#2307) (#2434)
---
.../core/src/execution_plans/distributed_query.rs | 4 +--
.../core/src/execution_plans/shuffle_reader.rs | 4 +--
.../core/src/execution_plans/shuffle_writer.rs | 10 +++---
.../core/src/execution_plans/unresolved_shuffle.rs | 4 +--
ballista/rust/core/src/serde/mod.rs | 3 +-
ballista/rust/executor/src/collect.rs | 10 ++----
datafusion-examples/examples/custom_datasource.rs | 7 ++--
datafusion/core/src/datasource/file_format/csv.rs | 2 +-
datafusion/core/src/datasource/file_format/json.rs | 2 +-
.../core/src/datasource/file_format/parquet.rs | 2 +-
datafusion/core/src/datasource/memory.rs | 10 +++---
.../src/physical_optimizer/aggregate_statistics.rs | 2 +-
.../core/src/physical_plan/aggregates/mod.rs | 15 +++------
datafusion/core/src/physical_plan/analyze.rs | 6 ++--
.../core/src/physical_plan/coalesce_batches.rs | 8 ++---
.../core/src/physical_plan/coalesce_partitions.rs | 9 ++---
datafusion/core/src/physical_plan/common.rs | 2 +-
datafusion/core/src/physical_plan/cross_join.rs | 7 ++--
datafusion/core/src/physical_plan/empty.rs | 12 +++----
datafusion/core/src/physical_plan/explain.rs | 4 +--
.../core/src/physical_plan/file_format/avro.rs | 6 ++--
.../core/src/physical_plan/file_format/csv.rs | 14 ++++----
.../core/src/physical_plan/file_format/json.rs | 12 +++----
.../core/src/physical_plan/file_format/parquet.rs | 14 ++++----
datafusion/core/src/physical_plan/filter.rs | 8 ++---
datafusion/core/src/physical_plan/hash_join.rs | 34 +++++++++----------
datafusion/core/src/physical_plan/limit.rs | 13 +++-----
datafusion/core/src/physical_plan/memory.rs | 8 ++---
datafusion/core/src/physical_plan/mod.rs | 10 +++---
datafusion/core/src/physical_plan/planner.rs | 4 +--
datafusion/core/src/physical_plan/projection.rs | 8 ++---
datafusion/core/src/physical_plan/repartition.rs | 38 ++++++++++------------
.../core/src/physical_plan/sort_merge_join.rs | 12 +++----
datafusion/core/src/physical_plan/sorts/sort.rs | 5 ++-
.../physical_plan/sorts/sort_preserving_merge.rs | 29 ++++++-----------
datafusion/core/src/physical_plan/union.rs | 6 ++--
datafusion/core/src/physical_plan/values.rs | 4 +--
.../src/physical_plan/windows/window_agg_exec.rs | 6 ++--
datafusion/core/src/scheduler/mod.rs | 1 -
.../core/src/scheduler/pipeline/execution.rs | 29 +++--------------
datafusion/core/src/test/exec.rs | 16 +++------
datafusion/core/tests/custom_sources.rs | 9 +++--
datafusion/core/tests/provider_filter_pushdown.rs | 3 +-
datafusion/core/tests/statistics.rs | 3 +-
datafusion/core/tests/user_defined_plan.rs | 4 +--
45 files changed, 160 insertions(+), 259 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index ed3c9fceb..0b20acb47 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -41,7 +41,6 @@ use datafusion::physical_plan::{
use crate::serde::protobuf::execute_query_params::OptionalSessionId;
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
-use async_trait::async_trait;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::context::TaskContext;
@@ -122,7 +121,6 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
}
}
-#[async_trait]
impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
fn as_any(&self) -> &dyn Any {
self
@@ -162,7 +160,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
}))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 27252b980..3046a2276 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
use crate::client::BallistaClient;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
-use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result};
@@ -64,7 +63,6 @@ impl ShuffleReaderExec {
}
}
-#[async_trait]
impl ExecutionPlan for ShuffleReaderExec {
fn as_any(&self) -> &dyn Any {
self
@@ -101,7 +99,7 @@ impl ExecutionPlan for ShuffleReaderExec {
))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index f5c98b200..45a102185 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -33,7 +33,6 @@ use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
use crate::serde::scheduler::PartitionStats;
-use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder,
};
@@ -155,7 +154,7 @@ impl ShuffleWriterExec {
async move {
let now = Instant::now();
- let mut stream = plan.execute(input_partition, context).await?;
+ let mut stream = plan.execute(input_partition, context)?;
match output_partitioning {
None => {
@@ -293,7 +292,6 @@ impl ShuffleWriterExec {
}
}
-#[async_trait]
impl ExecutionPlan for ShuffleWriterExec {
fn as_any(&self) -> &dyn Any {
self
@@ -336,7 +334,7 @@ impl ExecutionPlan for ShuffleWriterExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -459,7 +457,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
- let mut stream = query_stage.execute(0, task_ctx).await?;
+ let mut stream = query_stage.execute(0, task_ctx)?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -516,7 +514,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
- let mut stream = query_stage.execute(0, task_ctx).await?;
+ let mut stream = query_stage.execute(0, task_ctx)?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index e1eecdd92..15d403fb6 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -18,7 +18,6 @@
use std::any::Any;
use std::sync::Arc;
-use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
@@ -63,7 +62,6 @@ impl UnresolvedShuffleExec {
}
}
-#[async_trait]
impl ExecutionPlan for UnresolvedShuffleExec {
fn as_any(&self) -> &dyn Any {
self
@@ -101,7 +99,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
))
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index ed41ce61c..bc2d7ff6b 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -477,7 +477,6 @@ mod tests {
}
}
- #[async_trait]
impl ExecutionPlan for TopKExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -515,7 +514,7 @@ mod tests {
}
/// Execute one partition and return an iterator over RecordBatch
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index 1bb4acaf8..54e97550a 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -22,7 +22,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};
-use async_trait::async_trait;
use datafusion::arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
@@ -49,7 +48,6 @@ impl CollectExec {
}
}
-#[async_trait]
impl ExecutionPlan for CollectExec {
fn as_any(&self) -> &dyn Any {
self
@@ -78,7 +76,7 @@ impl ExecutionPlan for CollectExec {
unimplemented!()
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -86,10 +84,8 @@ impl ExecutionPlan for CollectExec {
assert_eq!(0, partition);
let num_partitions = self.plan.output_partitioning().partition_count();
- let futures = (0..num_partitions).map(|i| self.plan.execute(i, context.clone()));
- let streams = futures::future::join_all(futures)
- .await
- .into_iter()
+ let streams = (0..num_partitions)
+ .map(|i| self.plan.execute(i, context.clone()))
.collect::<Result<Vec<_>>>()
.map_err(|e| DataFusionError::Execution(format!("BallistaError: {:?}", e)))?;
diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index a4b9fda1a..d8a908986 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -196,7 +196,6 @@ impl CustomExec {
}
}
-#[async_trait]
impl ExecutionPlan for CustomExec {
fn as_any(&self) -> &dyn Any {
self
@@ -225,7 +224,7 @@ impl ExecutionPlan for CustomExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
@@ -243,7 +242,7 @@ impl ExecutionPlan for CustomExec {
account_array.append_value(user.bank_account)?;
}
- return Ok(Box::pin(MemoryStream::try_new(
+ Ok(Box::pin(MemoryStream::try_new(
vec![RecordBatch::try_new(
self.projected_schema.clone(),
vec![
@@ -253,7 +252,7 @@ impl ExecutionPlan for CustomExec {
)?],
self.schema(),
None,
- )?));
+ )?))
}
fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 87bc6be1a..cbc2adca1 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -161,7 +161,7 @@ mod tests {
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
let task_ctx = ctx.task_ctx();
- let stream = exec.execute(0, task_ctx).await?;
+ let stream = exec.execute(0, task_ctx)?;
let tt_batches: i32 = stream
.map(|batch| {
diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs
index c56b84353..cd4fd5810 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -131,7 +131,7 @@ mod tests {
let projection = None;
let exec = get_exec(&projection, None).await?;
let task_ctx = ctx.task_ctx();
- let stream = exec.execute(0, task_ctx).await?;
+ let stream = exec.execute(0, task_ctx)?;
let tt_batches: i32 = stream
.map(|batch| {
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index daadff97d..a9a0c788b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -512,7 +512,7 @@ mod tests {
let projection = None;
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
let task_ctx = ctx.task_ctx();
- let stream = exec.execute(0, task_ctx).await?;
+ let stream = exec.execute(0, task_ctx)?;
let tt_batches = stream
.map(|batch| {
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 90e429b18..72630b39c 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -76,7 +76,7 @@ impl MemTable {
let context1 = context.clone();
let exec = exec.clone();
tokio::spawn(async move {
- let stream = exec.execute(part_i, context1.clone()).await?;
+ let stream = exec.execute(part_i, context1.clone())?;
common::collect(stream).await
})
})
@@ -103,7 +103,7 @@ impl MemTable {
let mut output_partitions = vec![];
for i in 0..exec.output_partitioning().partition_count() {
// execute this *output* partition and collect all batches
- let mut stream = exec.execute(i, context.clone()).await?;
+ let mut stream = exec.execute(i, context.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
batches.push(result?);
@@ -177,7 +177,7 @@ mod tests {
// scan with projection
let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?;
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch2 = it.next().await.unwrap()?;
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
@@ -209,7 +209,7 @@ mod tests {
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
let exec = provider.scan(&None, &[], None).await?;
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
assert_eq!(3, batch1.num_columns());
@@ -365,7 +365,7 @@ mod tests {
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
let exec = provider.scan(&None, &[], None).await?;
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(3, batch1.schema().fields().len());
assert_eq!(3, batch1.num_columns());
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 9c548ab9b..4cf96d235 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -304,7 +304,7 @@ mod tests {
// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
- let result = common::collect(optimized.execute(0, task_ctx).await?).await?;
+ let result = common::collect(optimized.execute(0, task_ctx)?).await?;
assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
assert_eq!(
result[0]
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5e1da793c..3682ec6eb 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -29,7 +29,6 @@ use crate::physical_plan::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
-use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::Column;
@@ -145,7 +144,6 @@ impl AggregateExec {
}
}
-#[async_trait]
impl ExecutionPlan for AggregateExec {
/// Return a reference to Any that can be used for down-casting
fn as_any(&self) -> &dyn Any {
@@ -196,12 +194,12 @@ impl ExecutionPlan for AggregateExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let input = self.input.execute(partition, context).await?;
+ let input = self.input.execute(partition, context)?;
let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -417,7 +415,6 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
- use async_trait::async_trait;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
use futures::{FutureExt, Stream};
@@ -489,8 +486,7 @@ mod tests {
)?);
let result =
- common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?)
- .await?;
+ common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?;
let expected = vec![
"+---+---------------+-------------+",
@@ -522,7 +518,7 @@ mod tests {
)?);
let result =
- common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?;
+ common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
assert_eq!(result.len(), 1);
let batch = &result[0];
@@ -556,7 +552,6 @@ mod tests {
pub yield_first: bool,
}
- #[async_trait]
impl ExecutionPlan for TestYieldingExec {
fn as_any(&self) -> &dyn Any {
self
@@ -587,7 +582,7 @@ mod tests {
)))
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index f8050f16c..c2f08c69a 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -33,7 +33,6 @@ use futures::StreamExt;
use super::expressions::PhysicalSortExpr;
use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
@@ -58,7 +57,6 @@ impl AnalyzeExec {
}
}
-#[async_trait]
impl ExecutionPlan for AnalyzeExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -102,7 +100,7 @@ impl ExecutionPlan for AnalyzeExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -126,7 +124,7 @@ impl ExecutionPlan for AnalyzeExec {
let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
let captured_input = self.input.clone();
- let mut input_stream = captured_input.execute(0, context).await?;
+ let mut input_stream = captured_input.execute(0, context)?;
let captured_schema = self.schema.clone();
let verbose = self.verbose;
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 482b0ee77..75ecaf53e 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -34,7 +34,6 @@ use arrow::compute::kernels::concat::concat;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::debug;
@@ -75,7 +74,6 @@ impl CoalesceBatchesExec {
}
}
-#[async_trait]
impl ExecutionPlan for CoalesceBatchesExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -116,13 +114,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(CoalesceBatchesStream {
- input: self.input.execute(partition, context).await?,
+ input: self.input.execute(partition, context)?,
schema: self.input.schema(),
target_batch_size: self.target_batch_size,
buffer: Vec::new(),
@@ -348,7 +346,7 @@ mod tests {
for i in 0..output_partition_count {
// execute this *output* partition and collect all batches
let task_ctx = session_ctx.task_ctx();
- let mut stream = exec.execute(i, task_ctx.clone()).await?;
+ let mut stream = exec.execute(i, task_ctx.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
batches.push(result?);
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 35f75db03..11fcd5d50 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -25,8 +25,6 @@ use std::task::Poll;
use futures::Stream;
use tokio::sync::mpsc;
-use async_trait::async_trait;
-
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
@@ -66,7 +64,6 @@ impl CoalescePartitionsExec {
}
}
-#[async_trait]
impl ExecutionPlan for CoalescePartitionsExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -101,7 +98,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone())))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -121,7 +118,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
)),
1 => {
// bypass any threading / metrics if there is a single partition
- self.input.execute(0, context).await
+ self.input.execute(0, context)
}
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -252,7 +249,7 @@ mod tests {
assert_eq!(merge.output_partitioning().partition_count(), 1);
// the result should contain 4 batches (one per input partition)
- let iter = merge.execute(0, task_ctx).await?;
+ let iter = merge.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert_eq!(batches.len(), num_partitions);
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 68bd676dd..24df647dc 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -179,7 +179,7 @@ pub(crate) fn spawn_execution(
context: Arc<TaskContext>,
) -> JoinHandle<()> {
tokio::spawn(async move {
- let mut stream = match input.execute(partition, context).await {
+ let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn
// down, no place to send the error
diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/cross_join.rs
index 2846af9c5..e3f25fc56 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/cross_join.rs
@@ -111,7 +111,7 @@ async fn load_left_input(
// merge all left parts into a single stream
let merge = CoalescePartitionsExec::new(left.clone());
- let stream = merge.execute(0, context).await?;
+ let stream = merge.execute(0, context)?;
// Load all batches and count the rows
let (batches, num_rows) = stream
@@ -133,7 +133,6 @@ async fn load_left_input(
Ok(merged_batch)
}
-#[async_trait]
impl ExecutionPlan for CrossJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -169,12 +168,12 @@ impl ExecutionPlan for CrossJoinExec {
false
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let stream = self.right.execute(partition, context.clone()).await?;
+ let stream = self.right.execute(partition, context.clone())?;
let left_fut = self
.left_fut
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 43e749e1d..bba87e190 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -33,7 +33,6 @@ use super::expressions::PhysicalSortExpr;
use super::{common, SendableRecordBatchStream, Statistics};
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
/// Execution plan for empty relation (produces no rows)
#[derive(Debug)]
@@ -76,7 +75,6 @@ impl EmptyExec {
}
}
-#[async_trait]
impl ExecutionPlan for EmptyExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -114,7 +112,7 @@ impl ExecutionPlan for EmptyExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -172,7 +170,7 @@ mod tests {
assert_eq!(empty.schema(), schema);
// we should have no results
- let iter = empty.execute(0, task_ctx).await?;
+ let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert!(batches.is_empty());
@@ -208,8 +206,8 @@ mod tests {
let empty = EmptyExec::new(false, schema);
// ask for the wrong partition
- assert!(empty.execute(1, task_ctx.clone()).await.is_err());
- assert!(empty.execute(20, task_ctx).await.is_err());
+ assert!(empty.execute(1, task_ctx.clone()).is_err());
+ assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}
@@ -220,7 +218,7 @@ mod tests {
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);
- let iter = empty.execute(0, task_ctx).await?;
+ let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
// should have one item
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index fb6631588..fdc139a7e 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -34,7 +34,6 @@ use log::debug;
use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
-use async_trait::async_trait;
/// Explain execution plan operator. This operator contains the string
/// values of the various plans it has when it is created, and passes
@@ -74,7 +73,6 @@ impl ExplainExec {
}
}
-#[async_trait]
impl ExecutionPlan for ExplainExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -110,7 +108,7 @@ impl ExecutionPlan for ExplainExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 68f8f2f90..eed0161a2 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -28,7 +28,6 @@ use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
use std::any::Any;
use std::sync::Arc;
@@ -61,7 +60,6 @@ impl AvroExec {
}
}
-#[async_trait]
impl ExecutionPlan for AvroExec {
fn as_any(&self) -> &dyn Any {
self
@@ -95,7 +93,7 @@ impl ExecutionPlan for AvroExec {
}
#[cfg(not(feature = "avro"))]
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
@@ -106,7 +104,7 @@ impl ExecutionPlan for AvroExec {
}
#[cfg(feature = "avro")]
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 8aea607ea..96bceb225 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -26,7 +26,6 @@ use crate::physical_plan::{
use arrow::csv;
use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use std::any::Any;
use std::fs;
@@ -75,7 +74,6 @@ impl CsvExec {
}
}
-#[async_trait]
impl ExecutionPlan for CsvExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -112,7 +110,7 @@ impl ExecutionPlan for CsvExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -191,7 +189,7 @@ pub async fn plan_to_csv(
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let task_ctx = Arc::new(TaskContext::from(state));
- let stream = plan.execute(i, task_ctx).await?;
+ let stream = plan.execute(i, task_ctx)?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
@@ -250,7 +248,7 @@ mod tests {
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
- let mut stream = csv.execute(0, task_ctx).await?;
+ let mut stream = csv.execute(0, task_ctx)?;
let batch = stream.next().await.unwrap()?;
assert_eq!(3, batch.num_columns());
assert_eq!(100, batch.num_rows());
@@ -297,7 +295,7 @@ mod tests {
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
- let mut it = csv.execute(0, task_ctx).await?;
+ let mut it = csv.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(13, batch.num_columns());
assert_eq!(5, batch.num_rows());
@@ -344,7 +342,7 @@ mod tests {
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());
- let mut it = csv.execute(0, task_ctx).await?;
+ let mut it = csv.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(14, batch.num_columns());
assert_eq!(5, batch.num_rows());
@@ -398,7 +396,7 @@ mod tests {
assert_eq!(2, csv.projected_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());
- let mut it = csv.execute(0, task_ctx).await?;
+ let mut it = csv.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(2, batch.num_columns());
assert_eq!(100, batch.num_rows());
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index 5c02a9c92..818496d13 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -17,7 +17,6 @@
//! Execution plan for reading line-delimited JSON files
use arrow::json::reader::DecoderOptions;
-use async_trait::async_trait;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
@@ -58,7 +57,6 @@ impl NdJsonExec {
}
}
-#[async_trait]
impl ExecutionPlan for NdJsonExec {
fn as_any(&self) -> &dyn Any {
self
@@ -91,7 +89,7 @@ impl ExecutionPlan for NdJsonExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -168,7 +166,7 @@ pub async fn plan_to_json(
let file = fs::File::create(path)?;
let mut writer = json::LineDelimitedWriter::new(file);
let task_ctx = Arc::new(TaskContext::from(state));
- let stream = plan.execute(i, task_ctx).await?;
+ let stream = plan.execute(i, task_ctx)?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(batch?))
@@ -255,7 +253,7 @@ mod tests {
&DataType::Utf8
);
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(batch.num_rows(), 3);
@@ -296,7 +294,7 @@ mod tests {
table_partition_cols: vec![],
});
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(batch.num_rows(), 3);
@@ -335,7 +333,7 @@ mod tests {
inferred_schema.field_with_name("c").unwrap();
inferred_schema.field_with_name("d").unwrap_err();
- let mut it = exec.execute(0, task_ctx).await?;
+ let mut it = exec.execute(0, task_ctx)?;
let batch = it.next().await.unwrap()?;
assert_eq!(batch.num_rows(), 4);
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index df7327aa0..d2e156f32 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -33,7 +33,6 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use async_trait::async_trait;
use futures::{Stream, StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::{
@@ -165,7 +164,6 @@ impl ParquetFileMetrics {
}
}
-#[async_trait]
impl ExecutionPlan for ParquetExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -201,7 +199,7 @@ impl ExecutionPlan for ParquetExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
partition_index: usize,
context: Arc<TaskContext>,
@@ -592,7 +590,7 @@ pub async fn plan_to_parquet(
writer_properties.clone(),
)?;
let task_ctx = Arc::new(TaskContext::from(state));
- let stream = plan.execute(i, task_ctx).await?;
+ let stream = plan.execute(i, task_ctx)?;
let handle: tokio::task::JoinHandle<Result<()>> =
tokio::task::spawn(async move {
stream
@@ -1059,7 +1057,7 @@ mod tests {
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
- let mut results = parquet_exec.execute(0, task_ctx).await?;
+ let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap()?;
assert_eq!(8, batch.num_rows());
@@ -1111,7 +1109,7 @@ mod tests {
None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
- let results = parquet_exec.execute(0, task_ctx).await?.next().await;
+ let results = parquet_exec.execute(0, task_ctx)?.next().await;
if let Some(expected_row_num) = expected_row_num {
let batch = results.unwrap()?;
@@ -1190,7 +1188,7 @@ mod tests {
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
- let mut results = parquet_exec.execute(0, task_ctx).await?;
+ let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap()?;
let expected = vec![
"+----+----------+-------------+-------+",
@@ -1247,7 +1245,7 @@ mod tests {
None,
);
- let mut results = parquet_exec.execute(0, task_ctx).await?;
+ let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap();
// invalid file should produce an error to that effect
assert_contains!(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 158bedf28..4aa6453bf 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -36,7 +36,6 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
use log::debug;
use crate::execution::context::TaskContext;
@@ -84,7 +83,6 @@ impl FilterExec {
}
}
-#[async_trait]
impl ExecutionPlan for FilterExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -129,7 +127,7 @@ impl ExecutionPlan for FilterExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -137,9 +135,9 @@ impl ExecutionPlan for FilterExec {
debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
- schema: self.input.schema().clone(),
+ schema: self.input.schema(),
predicate: self.predicate.clone(),
- input: self.input.execute(partition, context).await?,
+ input: self.input.execute(partition, context)?,
baseline_metrics,
}))
}
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index 31882c63c..8a4a342c1 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -34,7 +34,6 @@ use std::sync::Arc;
use std::{any::Any, usize};
use std::{time::Instant, vec};
-use async_trait::async_trait;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use arrow::array::{new_null_array, Array};
@@ -250,7 +249,6 @@ impl HashJoinExec {
}
}
-#[async_trait]
impl ExecutionPlan for HashJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -290,7 +288,7 @@ impl ExecutionPlan for HashJoinExec {
false
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -318,7 +316,7 @@ impl ExecutionPlan for HashJoinExec {
// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
- let right_stream = self.right.execute(partition, context).await?;
+ let right_stream = self.right.execute(partition, context)?;
Ok(Box::pin(HashJoinStream {
schema: self.schema(),
@@ -375,7 +373,7 @@ async fn collect_left_input(
// merge all left parts into a single stream
let merge = CoalescePartitionsExec::new(left);
- let stream = merge.execute(0, context).await?;
+ let stream = merge.execute(0, context)?;
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
@@ -430,7 +428,7 @@ async fn partitioned_left_input(
let start = Instant::now();
// Load 1 partition of left side in memory
- let stream = left.execute(partition, context.clone()).await?;
+ let stream = left.execute(partition, context.clone())?;
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
@@ -1122,7 +1120,7 @@ mod tests {
let join = join(left, right, on, join_type, null_equals_null)?;
let columns = columns(&join.schema());
- let stream = join.execute(0, context).await?;
+ let stream = join.execute(0, context)?;
let batches = common::collect(stream).await?;
Ok((columns, batches))
@@ -1167,7 +1165,7 @@ mod tests {
let mut batches = vec![];
for i in 0..partition_count {
- let stream = join.execute(i, context.clone()).await?;
+ let stream = join.execute(i, context.clone())?;
let more_batches = common::collect(stream).await?;
batches.extend(
more_batches
@@ -1446,7 +1444,7 @@ mod tests {
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
// first part
- let stream = join.execute(0, task_ctx.clone()).await?;
+ let stream = join.execute(0, task_ctx.clone())?;
let batches = common::collect(stream).await?;
assert_eq!(batches.len(), 1);
@@ -1460,7 +1458,7 @@ mod tests {
assert_batches_sorted_eq!(expected, &batches);
// second part
- let stream = join.execute(1, task_ctx.clone()).await?;
+ let stream = join.execute(1, task_ctx.clone())?;
let batches = common::collect(stream).await?;
assert_eq!(batches.len(), 1);
let expected = vec![
@@ -1513,7 +1511,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
- let stream = join.execute(0, task_ctx).await.unwrap();
+ let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected = vec![
@@ -1556,7 +1554,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
- let stream = join.execute(0, task_ctx).await.unwrap();
+ let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected = vec![
@@ -1597,7 +1595,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
- let stream = join.execute(0, task_ctx).await.unwrap();
+ let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected = vec![
@@ -1634,7 +1632,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
- let stream = join.execute(0, task_ctx).await.unwrap();
+ let stream = join.execute(0, task_ctx).unwrap();
let batches = common::collect(stream).await.unwrap();
let expected = vec![
@@ -1762,7 +1760,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = vec![
@@ -1803,7 +1801,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = vec![
@@ -1921,7 +1919,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = vec![
@@ -2014,7 +2012,7 @@ mod tests {
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]);
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
let expected = vec![
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 71e08afd2..00516db0b 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -43,7 +43,6 @@ use super::{
};
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
/// Limit execution plan
#[derive(Debug)]
@@ -77,7 +76,6 @@ impl GlobalLimitExec {
}
}
-#[async_trait]
impl ExecutionPlan for GlobalLimitExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -127,7 +125,7 @@ impl ExecutionPlan for GlobalLimitExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -152,7 +150,7 @@ impl ExecutionPlan for GlobalLimitExec {
}
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
- let stream = self.input.execute(0, context).await?;
+ let stream = self.input.execute(0, context)?;
Ok(Box::pin(LimitStream::new(
stream,
self.limit,
@@ -230,7 +228,6 @@ impl LocalLimitExec {
}
}
-#[async_trait]
impl ExecutionPlan for LocalLimitExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -282,14 +279,14 @@ impl ExecutionPlan for LocalLimitExec {
}
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
- let stream = self.input.execute(partition, context).await?;
+ let stream = self.input.execute(partition, context)?;
Ok(Box::pin(LimitStream::new(
stream,
self.limit,
@@ -467,7 +464,7 @@ mod tests {
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
// the result should contain 4 batches (one per input partition)
- let iter = limit.execute(0, task_ctx).await?;
+ let iter = limit.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
// there should be a total of 100 rows
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 2862aefdb..6a0af7650 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -33,7 +33,6 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
use datafusion_common::DataFusionError;
use futures::Stream;
@@ -57,7 +56,6 @@ impl fmt::Debug for MemoryExec {
}
}
-#[async_trait]
impl ExecutionPlan for MemoryExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -97,7 +95,7 @@ impl ExecutionPlan for MemoryExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
@@ -279,7 +277,7 @@ mod tests {
);
// scan with projection
- let mut it = executor.execute(0, task_ctx).await?;
+ let mut it = executor.execute(0, task_ctx)?;
let batch2 = it.next().await.unwrap()?;
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
@@ -329,7 +327,7 @@ mod tests {
])
);
- let mut it = executor.execute(0, task_ctx).await?;
+ let mut it = executor.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(4, batch1.schema().fields().len());
assert_eq!(4, batch1.num_columns());
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index dc963c7e1..feb0bf322 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -29,7 +29,6 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use display::DisplayFormatType;
@@ -128,7 +127,6 @@ pub struct ColumnStatistics {
/// [`ExecutionPlan`] can be displayed in an simplified form using the
/// return value from [`displayable`] in addition to the (normally
/// quite verbose) `Debug` output.
-#[async_trait]
pub trait ExecutionPlan: Debug + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
@@ -223,7 +221,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
) -> Result<Arc<dyn ExecutionPlan>>;
/// creates an iterator
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -426,13 +424,13 @@ pub async fn execute_stream(
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
- 1 => plan.execute(0, context).await,
+ 1 => plan.execute(0, context),
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
- plan.execute(0, context).await
+ plan.execute(0, context)
}
}
}
@@ -458,7 +456,7 @@ pub async fn execute_stream_partitioned(
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
- streams.push(plan.execute(i, context.clone()).await?);
+ streams.push(plan.execute(i, context.clone())?);
}
Ok(streams)
}
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index b58029eb5..85fb7d424 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1510,7 +1510,6 @@ mod tests {
logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream,
};
use arrow::datatypes::{DataType, Field, SchemaRef};
- use async_trait::async_trait;
use datafusion_common::{DFField, DFSchema, DFSchemaRef};
use datafusion_expr::sum;
use datafusion_expr::{col, lit};
@@ -2000,7 +1999,6 @@ mod tests {
schema: SchemaRef,
}
- #[async_trait]
impl ExecutionPlan for NoOpExecutionPlan {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -2034,7 +2032,7 @@ mod tests {
unimplemented!("NoOpExecutionPlan::with_new_children");
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 554dda12c..8e8a1ee54 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -39,7 +39,6 @@ use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::execution::context::TaskContext;
-use async_trait::async_trait;
use futures::stream::Stream;
use futures::stream::StreamExt;
@@ -102,7 +101,6 @@ impl ProjectionExec {
}
}
-#[async_trait]
impl ExecutionPlan for ProjectionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -146,7 +144,7 @@ impl ExecutionPlan for ProjectionExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -155,7 +153,7 @@ impl ExecutionPlan for ProjectionExec {
Ok(Box::pin(ProjectionStream {
schema: self.schema.clone(),
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
- input: self.input.execute(partition, context).await?,
+ input: self.input.execute(partition, context)?,
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
}
@@ -345,7 +343,7 @@ mod tests {
let mut row_count = 0;
for partition in 0..projection.output_partitioning().partition_count() {
partition_count += 1;
- let stream = projection.execute(partition, task_ctx.clone()).await?;
+ let stream = projection.execute(partition, task_ctx.clone())?;
row_count += stream
.map(|batch| {
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 379555396..82efe2c4f 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -37,17 +37,14 @@ use super::common::{AbortOnDropMany, AbortOnDropSingle};
use super::expressions::PhysicalSortExpr;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream};
-use async_trait::async_trait;
use crate::execution::context::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::stream::Stream;
use futures::StreamExt;
use hashbrown::HashMap;
-use tokio::sync::{
- mpsc::{self, UnboundedReceiver, UnboundedSender},
- Mutex,
-};
+use parking_lot::Mutex;
+use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
type MaybeBatch = Option<ArrowResult<RecordBatch>>;
@@ -261,7 +258,6 @@ impl RepartitionExec {
}
}
-#[async_trait]
impl ExecutionPlan for RepartitionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -299,7 +295,7 @@ impl ExecutionPlan for RepartitionExec {
None
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -309,7 +305,7 @@ impl ExecutionPlan for RepartitionExec {
partition
);
// lock mutexes
- let mut state = self.state.lock().await;
+ let mut state = self.state.lock();
let num_input_partitions = self.input.output_partitioning().partition_count();
let num_output_partitions = self.partitioning.partition_count();
@@ -437,7 +433,7 @@ impl RepartitionExec {
// execute the child operator
let timer = r_metrics.fetch_time.timer();
- let mut stream = input.execute(i, context).await?;
+ let mut stream = input.execute(i, context)?;
timer.done();
// While there are still outputs to send to, keep
@@ -689,7 +685,7 @@ mod tests {
let mut output_partitions = vec![];
for i in 0..exec.partitioning.partition_count() {
// execute this *output* partition and collect all batches
- let mut stream = exec.execute(i, task_ctx.clone()).await?;
+ let mut stream = exec.execute(i, task_ctx.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
batches.push(result?);
@@ -745,7 +741,7 @@ mod tests {
// returned and no results produced
let partitioning = Partitioning::UnknownPartitioning(1);
let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
- let output_stream = exec.execute(0, task_ctx).await.unwrap();
+ let output_stream = exec.execute(0, task_ctx).unwrap();
// Expect that an error is returned
let result_string = crate::physical_plan::common::collect(output_stream)
@@ -773,7 +769,7 @@ mod tests {
// Note: this should pass (the stream can be created) but the
// error when the input is executed should get passed back
- let output_stream = exec.execute(0, task_ctx).await.unwrap();
+ let output_stream = exec.execute(0, task_ctx).unwrap();
// Expect that an error is returned
let result_string = crate::physical_plan::common::collect(output_stream)
@@ -808,7 +804,7 @@ mod tests {
// Note: this should pass (the stream can be created) but the
// error when the input is executed should get passed back
- let output_stream = exec.execute(0, task_ctx).await.unwrap();
+ let output_stream = exec.execute(0, task_ctx).unwrap();
// Expect that an error is returned
let result_string = crate::physical_plan::common::collect(output_stream)
@@ -860,7 +856,7 @@ mod tests {
assert_batches_sorted_eq!(&expected, &expected_batches);
- let output_stream = exec.execute(0, task_ctx).await.unwrap();
+ let output_stream = exec.execute(0, task_ctx).unwrap();
let batches = crate::physical_plan::common::collect(output_stream)
.await
.unwrap();
@@ -880,8 +876,8 @@ mod tests {
// partition into two output streams
let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap();
- let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
- let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+ let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
+ let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
// now, purposely drop output stream 0
// *before* any outputs are produced
@@ -927,7 +923,7 @@ mod tests {
// We first collect the results without droping the output stream.
let input = Arc::new(make_barrier_exec());
let exec = RepartitionExec::try_new(input.clone(), partitioning.clone()).unwrap();
- let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+ let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
input.wait().await;
let batches_without_drop = crate::physical_plan::common::collect(output_stream1)
.await
@@ -947,8 +943,8 @@ mod tests {
// Now do the same but dropping the stream before waiting for the barrier
let input = Arc::new(make_barrier_exec());
let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap();
- let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
- let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+ let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
+ let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
// now, purposely drop output stream 0
// *before* any outputs are produced
std::mem::drop(output_stream0);
@@ -1053,11 +1049,11 @@ mod tests {
let schema = batch.schema();
let input = MockExec::new(vec![Ok(batch)], schema);
let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
- let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap();
+ let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap();
let batch0 = crate::physical_plan::common::collect(output_stream0)
.await
.unwrap();
- let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap();
+ let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap();
let batch1 = crate::physical_plan::common::collect(output_stream1)
.await
.unwrap();
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/sort_merge_join.rs
index 765f3ba25..c207917b6 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/sort_merge_join.rs
@@ -32,7 +32,6 @@ use arrow::compute::{take, SortOptions};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
use futures::{Stream, StreamExt};
use crate::error::DataFusionError;
@@ -112,7 +111,6 @@ impl SortMergeJoinExec {
}
}
-#[async_trait]
impl ExecutionPlan for SortMergeJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -153,7 +151,7 @@ impl ExecutionPlan for SortMergeJoinExec {
}
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -178,8 +176,8 @@ impl ExecutionPlan for SortMergeJoinExec {
};
// execute children plans
- let streamed = streamed.execute(partition, context.clone()).await?;
- let buffered = buffered.execute(partition, context.clone()).await?;
+ let streamed = streamed.execute(partition, context.clone())?;
+ let buffered = buffered.execute(partition, context.clone())?;
// create output buffer
let batch_size = context.session_config().batch_size;
@@ -1357,7 +1355,7 @@ mod tests {
)?;
let columns = columns(&join.schema());
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
Ok((columns, batches))
}
@@ -1374,7 +1372,7 @@ mod tests {
let join = join(left, right, on, join_type)?;
let columns = columns(&join.schema());
- let stream = join.execute(0, task_ctx).await?;
+ let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
Ok((columns, batches))
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index d11a0cf29..3a0b5f2a1 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -694,7 +694,6 @@ impl SortExec {
}
}
-#[async_trait]
impl ExecutionPlan for SortExec {
fn as_any(&self) -> &dyn Any {
self
@@ -748,7 +747,7 @@ impl ExecutionPlan for SortExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -775,7 +774,7 @@ impl ExecutionPlan for SortExec {
partition
);
- let input = self.input.execute(partition, context.clone()).await?;
+ let input = self.input.execute(partition, context.clone())?;
debug!("End SortExec's input.execute for partition: {}", partition);
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 37ed4acb8..8e3326255 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -35,7 +35,6 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use async_trait::async_trait;
use futures::stream::{Fuse, FusedStream};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
@@ -108,7 +107,6 @@ impl SortPreservingMergeExec {
}
}
-#[async_trait]
impl ExecutionPlan for SortPreservingMergeExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -150,7 +148,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
)))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -182,7 +180,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
)),
1 => {
// bypass if there is only one partition to merge (no metrics in this case either)
- let result = self.input.execute(0, context).await;
+ let result = self.input.execute(0, context);
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input");
result
}
@@ -210,20 +208,13 @@ impl ExecutionPlan for SortPreservingMergeExec {
)
})
.collect(),
- Err(_) => {
- futures::future::try_join_all((0..input_partitions).map(
- |partition| {
- let context = context.clone();
- async move {
- self.input
- .execute(partition, context)
- .await
- .map(|stream| SortedStream::new(stream, 0))
- }
- },
- ))
- .await?
- }
+ Err(_) => (0..input_partitions)
+ .map(|partition| {
+ let stream =
+ self.input.execute(partition, context.clone())?;
+ Ok(SortedStream::new(stream, 0))
+ })
+ .collect::<Result<_>>()?,
};
debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");
@@ -1209,7 +1200,7 @@ mod tests {
for partition in 0..partition_count {
let (sender, receiver) = mpsc::channel(1);
- let mut stream = batches.execute(partition, task_ctx.clone()).await.unwrap();
+ let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
let join_handle = tokio::spawn(async move {
while let Some(batch) = stream.next().await {
sender.send(batch).await.unwrap();
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index b794cad26..e89ac4a76 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -38,7 +38,6 @@ use crate::{
error::Result,
physical_plan::{expressions, metrics::BaselineMetrics},
};
-use async_trait::async_trait;
/// UNION ALL execution plan
#[derive(Debug)]
@@ -64,7 +63,6 @@ impl UnionExec {
}
}
-#[async_trait]
impl ExecutionPlan for UnionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -107,7 +105,7 @@ impl ExecutionPlan for UnionExec {
Ok(Arc::new(UnionExec::new(children)))
}
- async fn execute(
+ fn execute(
&self,
mut partition: usize,
context: Arc<TaskContext>,
@@ -123,7 +121,7 @@ impl ExecutionPlan for UnionExec {
for input in self.inputs.iter() {
// Calculate whether partition belongs to the current partition
if partition < input.output_partitioning().partition_count() {
- let stream = input.execute(partition, context.clone()).await?;
+ let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
} else {
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index f2ba681ed..897936814 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -29,7 +29,6 @@ use crate::scalar::ScalarValue;
use arrow::array::new_null_array;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
use std::any::Any;
use std::sync::Arc;
@@ -96,7 +95,6 @@ impl ValuesExec {
}
}
-#[async_trait]
impl ExecutionPlan for ValuesExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -138,7 +136,7 @@ impl ExecutionPlan for ValuesExec {
}))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 8b545a12b..e9eac35a3 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -33,7 +33,6 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use async_trait::async_trait;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use std::any::Any;
@@ -90,7 +89,6 @@ impl WindowAggExec {
}
}
-#[async_trait]
impl ExecutionPlan for WindowAggExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -148,12 +146,12 @@ impl ExecutionPlan for WindowAggExec {
)?))
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let input = self.input.execute(partition, context).await?;
+ let input = self.input.execute(partition, context)?;
let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs
index a765ddf83..4cdc34b72 100644
--- a/datafusion/core/src/scheduler/mod.rs
+++ b/datafusion/core/src/scheduler/mod.rs
@@ -76,7 +76,6 @@
use std::sync::Arc;
-use futures::stream::BoxStream;
use log::{debug, error};
use crate::error::Result;
diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs
index baf487d98..20e7c6e79 100644
--- a/datafusion/core/src/scheduler/pipeline/execution.rs
+++ b/datafusion/core/src/scheduler/pipeline/execution.rs
@@ -22,9 +22,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
-use arrow::error::ArrowError;
-use async_trait::async_trait;
-use futures::{Stream, StreamExt, TryStreamExt};
+use futures::{Stream, StreamExt};
use parking_lot::Mutex;
use crate::arrow::datatypes::SchemaRef;
@@ -39,7 +37,6 @@ use crate::physical_plan::{
};
use crate::scheduler::pipeline::Pipeline;
-use crate::scheduler::BoxStream;
/// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and
/// converts it to the push-based [`Pipeline`] interface
@@ -57,7 +54,7 @@ use crate::scheduler::BoxStream;
pub struct ExecutionPipeline {
proxied: Arc<dyn ExecutionPlan>,
inputs: Vec<Vec<Arc<Mutex<InputPartition>>>>,
- outputs: Vec<Mutex<BoxStream<'static, ArrowResult<RecordBatch>>>>,
+ outputs: Vec<Mutex<SendableRecordBatchStream>>,
}
impl std::fmt::Debug for ExecutionPipeline {
@@ -125,23 +122,8 @@ impl ExecutionPipeline {
// Construct the output streams
let output_count = proxied.output_partitioning().partition_count();
let outputs = (0..output_count)
- .map(|x| {
- let proxy_captured = proxied.clone();
- let task_captured = task_context.clone();
- let fut = async move {
- proxy_captured
- .execute(x, task_captured)
- .await
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- };
-
- // Use futures::stream::once to handle operators that perform computation
- // within `ExecutionPlan::execute`. If we evaluated these futures here
- // we could potentially block indefinitely waiting for inputs that will
- // never arrive as the query isn't scheduled yet
- Mutex::new(futures::stream::once(fut).try_flatten().boxed())
- })
- .collect();
+ .map(|x| proxied.execute(x, task_context.clone()).map(Mutex::new))
+ .collect::<Result<_>>()?;
Ok(Self {
proxied,
@@ -236,7 +218,6 @@ struct ProxyExecutionPlan {
inputs: Vec<Arc<Mutex<InputPartition>>>,
}
-#[async_trait]
impl ExecutionPlan for ProxyExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
@@ -281,7 +262,7 @@ impl ExecutionPlan for ProxyExecutionPlan {
unimplemented!()
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 7e0cbe35f..855bb3bbc 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -17,7 +17,6 @@
//! Simple iterator over batches for use in testing
-use async_trait::async_trait;
use std::{
any::Any,
pin::Pin,
@@ -138,7 +137,6 @@ impl MockExec {
}
}
-#[async_trait]
impl ExecutionPlan for MockExec {
fn as_any(&self) -> &dyn Any {
self
@@ -168,7 +166,7 @@ impl ExecutionPlan for MockExec {
}
/// Returns a stream which yields data
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
@@ -277,7 +275,6 @@ impl BarrierExec {
}
}
-#[async_trait]
impl ExecutionPlan for BarrierExec {
fn as_any(&self) -> &dyn Any {
self
@@ -307,7 +304,7 @@ impl ExecutionPlan for BarrierExec {
}
/// Returns a stream which yields data
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
@@ -378,7 +375,6 @@ impl ErrorExec {
}
}
-#[async_trait]
impl ExecutionPlan for ErrorExec {
fn as_any(&self) -> &dyn Any {
self
@@ -408,7 +404,7 @@ impl ExecutionPlan for ErrorExec {
}
/// Returns a stream which yields data
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
@@ -458,7 +454,6 @@ impl StatisticsExec {
}
}
}
-#[async_trait]
impl ExecutionPlan for StatisticsExec {
fn as_any(&self) -> &dyn Any {
self
@@ -487,7 +482,7 @@ impl ExecutionPlan for StatisticsExec {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
@@ -552,7 +547,6 @@ impl BlockingExec {
}
}
-#[async_trait]
impl ExecutionPlan for BlockingExec {
fn as_any(&self) -> &dyn Any {
self
@@ -585,7 +579,7 @@ impl ExecutionPlan for BlockingExec {
)))
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index dbaaca206..81e4706de 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -98,31 +98,36 @@ impl Stream for TestCustomRecordBatchStream {
}
}
-#[async_trait]
impl ExecutionPlan for CustomExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
}
+
fn schema(&self) -> SchemaRef {
let schema = TEST_CUSTOM_SCHEMA_REF!();
project_schema(&schema, self.projection.as_ref()).expect("projected schema")
}
+
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
+
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
- async fn execute(
+
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index 664e77e18..49cd70143 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -55,7 +55,6 @@ struct CustomPlan {
batches: Vec<Arc<RecordBatch>>,
}
-#[async_trait]
impl ExecutionPlan for CustomPlan {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -84,7 +83,7 @@ impl ExecutionPlan for CustomPlan {
unreachable!()
}
- async fn execute(
+ fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index d57c218d4..031506704 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -106,7 +106,6 @@ impl TableProvider for StatisticsValidation {
}
}
-#[async_trait]
impl ExecutionPlan for StatisticsValidation {
fn as_any(&self) -> &dyn Any {
self
@@ -135,7 +134,7 @@ impl ExecutionPlan for StatisticsValidation {
Ok(self)
}
- async fn execute(
+ fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 43e6eeacd..d062cf3a3 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -457,7 +457,7 @@ impl ExecutionPlan for TopKExec {
}
/// Execute one partition and return an iterator over RecordBatch
- async fn execute(
+ fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
@@ -470,7 +470,7 @@ impl ExecutionPlan for TopKExec {
}
Ok(Box::pin(TopKReader {
- input: self.input.execute(partition, context).await?,
+ input: self.input.execute(partition, context)?,
k: self.k,
done: false,
state: BTreeMap::new(),