You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/25 07:43:27 UTC
[arrow-datafusion] branch main updated: Run collect_partitioned in separate tasks (#6109)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0d93dede46 Run collect_partitioned in separate tasks (#6109)
0d93dede46 is described below
commit 0d93dede46aae547d0ac167aa3e3e502efdffb97
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 25 03:43:20 2023 -0400
Run collect_partitioned in separate tasks (#6109)
---
datafusion/core/src/datasource/memory.rs | 25 +++----------------------
datafusion/core/src/physical_plan/mod.rs | 23 +++++++++++++++++------
2 files changed, 20 insertions(+), 28 deletions(-)
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 27e27e85ad..ca083aebe3 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -17,7 +17,7 @@
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use std::any::Any;
use std::sync::Arc;
@@ -26,17 +26,16 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_expr::LogicalPlan;
use tokio::sync::RwLock;
-use tokio::task;
use crate::datasource::{TableProvider, TableType};
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::common;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{collect_partitioned, common};
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
/// In-memory data source for presenting a `Vec<RecordBatch>` as a
@@ -199,25 +198,7 @@ impl TableProvider for MemTable {
)?)
};
- // Get the task context from the session state.
- let task_ctx = state.task_ctx();
-
- // Execute the plan and collect the results into batches.
- let mut tasks = vec![];
- for idx in 0..plan.output_partitioning().partition_count() {
- let stream = plan.execute(idx, task_ctx.clone())?;
- let handle = task::spawn(async move {
- stream.try_collect().await.map_err(DataFusionError::from)
- });
- tasks.push(AbortOnDropSingle::new(handle));
- }
- let results = futures::future::join_all(tasks)
- .await
- .into_iter()
- .map(|result| {
- result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
- })
- .collect::<Result<Vec<Vec<RecordBatch>>>>()?;
+ let results = collect_partitioned(plan, state.task_ctx()).await?;
// Write the results into the table.
let mut all_batches = self.batches.write().await;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 9815d9491e..50c145095b 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -33,7 +33,7 @@ pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use display::DisplayFormatType;
-use futures::stream::Stream;
+use futures::stream::{Stream, TryStreamExt};
use std::fmt;
use std::fmt::Debug;
@@ -443,11 +443,21 @@ pub async fn collect_partitioned(
context: Arc<TaskContext>,
) -> Result<Vec<Vec<RecordBatch>>> {
let streams = execute_stream_partitioned(plan, context)?;
- let mut batches = Vec::with_capacity(streams.len());
- for stream in streams {
- batches.push(common::collect(stream).await?);
- }
- Ok(batches)
+
+ // Execute the plan and collect the results into batches.
+ let handles = streams
+ .into_iter()
+ .enumerate()
+ .map(|(idx, stream)| async move {
+ let handle = tokio::task::spawn(stream.try_collect());
+ AbortOnDropSingle::new(handle).await.map_err(|e| {
+ DataFusionError::Execution(format!(
+ "collect_partitioned partition {idx} panicked: {e}"
+ ))
+ })?
+ });
+
+ futures::future::try_join_all(handles).await
}
/// Execute the [ExecutionPlan] and return a vec with one stream per output partition
@@ -665,6 +675,7 @@ pub mod values;
pub mod windows;
use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_physical_expr::{