You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/04/25 07:43:27 UTC

[arrow-datafusion] branch main updated: Run collect_partitioned in separate tasks (#6109)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d93dede46 Run collect_partitioned in separate tasks (#6109)
0d93dede46 is described below

commit 0d93dede46aae547d0ac167aa3e3e502efdffb97
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 25 03:43:20 2023 -0400

    Run collect_partitioned in separate tasks (#6109)
---
 datafusion/core/src/datasource/memory.rs | 25 +++----------------------
 datafusion/core/src/physical_plan/mod.rs | 23 +++++++++++++++++------
 2 files changed, 20 insertions(+), 28 deletions(-)

diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 27e27e85ad..ca083aebe3 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -17,7 +17,7 @@
 
 //! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
 
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -26,17 +26,16 @@ use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 use datafusion_expr::LogicalPlan;
 use tokio::sync::RwLock;
-use tokio::task;
 
 use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::memory::MemoryExec;
 use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{collect_partitioned, common};
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// In-memory data source for presenting a `Vec<RecordBatch>` as a
@@ -199,25 +198,7 @@ impl TableProvider for MemTable {
             )?)
         };
 
-        // Get the task context from the session state.
-        let task_ctx = state.task_ctx();
-
-        // Execute the plan and collect the results into batches.
-        let mut tasks = vec![];
-        for idx in 0..plan.output_partitioning().partition_count() {
-            let stream = plan.execute(idx, task_ctx.clone())?;
-            let handle = task::spawn(async move {
-                stream.try_collect().await.map_err(DataFusionError::from)
-            });
-            tasks.push(AbortOnDropSingle::new(handle));
-        }
-        let results = futures::future::join_all(tasks)
-            .await
-            .into_iter()
-            .map(|result| {
-                result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
-            })
-            .collect::<Result<Vec<Vec<RecordBatch>>>>()?;
+        let results = collect_partitioned(plan, state.task_ctx()).await?;
 
         // Write the results into the table.
         let mut all_batches = self.batches.write().await;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 9815d9491e..50c145095b 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -33,7 +33,7 @@ pub use datafusion_expr::Accumulator;
 pub use datafusion_expr::ColumnarValue;
 pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
 pub use display::DisplayFormatType;
-use futures::stream::Stream;
+use futures::stream::{Stream, TryStreamExt};
 use std::fmt;
 use std::fmt::Debug;
 
@@ -443,11 +443,21 @@ pub async fn collect_partitioned(
     context: Arc<TaskContext>,
 ) -> Result<Vec<Vec<RecordBatch>>> {
     let streams = execute_stream_partitioned(plan, context)?;
-    let mut batches = Vec::with_capacity(streams.len());
-    for stream in streams {
-        batches.push(common::collect(stream).await?);
-    }
-    Ok(batches)
+
+    // Execute the plan and collect the results into batches.
+    let handles = streams
+        .into_iter()
+        .enumerate()
+        .map(|(idx, stream)| async move {
+            let handle = tokio::task::spawn(stream.try_collect());
+            AbortOnDropSingle::new(handle).await.map_err(|e| {
+                DataFusionError::Execution(format!(
+                    "collect_partitioned partition {idx} panicked: {e}"
+                ))
+            })?
+        });
+
+    futures::future::try_join_all(handles).await
 }
 
 /// Execute the [ExecutionPlan] and return a vec with one stream per output partition
@@ -665,6 +675,7 @@ pub mod values;
 pub mod windows;
 
 use crate::execution::context::TaskContext;
+use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 pub use datafusion_physical_expr::{