You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 15:26:50 UTC

[arrow-datafusion] branch master updated: Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ddaabb5f Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)
2ddaabb5f is described below

commit 2ddaabb5f060b788a8d65197733be11e2316aa57
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 16:26:45 2022 +0100

    Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)
    
    * Make SortPreservingMerge Usable Outside Tokio (#2201)
    
    * Workaround async_trait lifetime shenanigans
---
 .../physical_plan/sorts/sort_preserving_merge.rs   | 59 ++++++++++++++--------
 1 file changed, 38 insertions(+), 21 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 6b8367a53..37ed4acb8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -187,27 +187,44 @@ impl ExecutionPlan for SortPreservingMergeExec {
                 result
             }
             _ => {
-                let receivers = (0..input_partitions)
-                    .into_iter()
-                    .map(|part_i| {
-                        let (sender, receiver) = mpsc::channel(1);
-                        let join_handle = spawn_execution(
-                            self.input.clone(),
-                            sender,
-                            part_i,
-                            context.clone(),
-                        );
-
-                        SortedStream::new(
-                            RecordBatchReceiverStream::create(
-                                &schema,
-                                receiver,
-                                join_handle,
-                            ),
-                            0,
-                        )
-                    })
-                    .collect();
+                // Use tokio only if running from a tokio context (#2201)
+                let receivers = match tokio::runtime::Handle::try_current() {
+                    Ok(_) => (0..input_partitions)
+                        .into_iter()
+                        .map(|part_i| {
+                            let (sender, receiver) = mpsc::channel(1);
+                            let join_handle = spawn_execution(
+                                self.input.clone(),
+                                sender,
+                                part_i,
+                                context.clone(),
+                            );
+
+                            SortedStream::new(
+                                RecordBatchReceiverStream::create(
+                                    &schema,
+                                    receiver,
+                                    join_handle,
+                                ),
+                                0,
+                            )
+                        })
+                        .collect(),
+                    Err(_) => {
+                        futures::future::try_join_all((0..input_partitions).map(
+                            |partition| {
+                                let context = context.clone();
+                                async move {
+                                    self.input
+                                        .execute(partition, context)
+                                        .await
+                                        .map(|stream| SortedStream::new(stream, 0))
+                                }
+                            },
+                        ))
+                        .await?
+                    }
+                };
 
                 debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");