You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/14 17:38:08 UTC

[arrow-datafusion] branch master updated: Make execute_stream functions sync (#4608)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0baf5ef0c Make execute_stream functions sync (#4608)
0baf5ef0c is described below

commit 0baf5ef0c57dc723c09852fac0aadfd06102a094
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Dec 14 17:38:02 2022 +0000

    Make execute_stream functions sync (#4608)
---
 datafusion/core/src/dataframe.rs         | 4 ++--
 datafusion/core/src/physical_plan/mod.rs | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index d7dd04f88..d8e93c760 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -457,7 +457,7 @@ impl DataFrame {
     pub async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
         let plan = self.create_physical_plan().await?;
         let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone()));
-        execute_stream(plan, task_ctx).await
+        execute_stream(plan, task_ctx)
     }
 
     /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
@@ -498,7 +498,7 @@ impl DataFrame {
     ) -> Result<Vec<SendableRecordBatchStream>> {
         let plan = self.create_physical_plan().await?;
         let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone()));
-        execute_stream_partitioned(plan, task_ctx).await
+        execute_stream_partitioned(plan, task_ctx)
     }
 
     /// Returns the schema describing the output of this DataFrame in terms of columns returned,
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index aa365ea45..24f5b2daa 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -406,12 +406,12 @@ pub async fn collect(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
 ) -> Result<Vec<RecordBatch>> {
-    let stream = execute_stream(plan, context).await?;
+    let stream = execute_stream(plan, context)?;
     common::collect(stream).await
 }
 
 /// Execute the [ExecutionPlan] and return a single stream of results
-pub async fn execute_stream(
+pub fn execute_stream(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
 ) -> Result<SendableRecordBatchStream> {
@@ -433,7 +433,7 @@ pub async fn collect_partitioned(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
 ) -> Result<Vec<Vec<RecordBatch>>> {
-    let streams = execute_stream_partitioned(plan, context).await?;
+    let streams = execute_stream_partitioned(plan, context)?;
     let mut batches = Vec::with_capacity(streams.len());
     for stream in streams {
         batches.push(common::collect(stream).await?);
@@ -442,7 +442,7 @@ pub async fn collect_partitioned(
 }
 
 /// Execute the [ExecutionPlan] and return a vec with one stream per output partition
-pub async fn execute_stream_partitioned(
+pub fn execute_stream_partitioned(
     plan: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
 ) -> Result<Vec<SendableRecordBatchStream>> {