You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 15:26:50 UTC
[arrow-datafusion] branch master updated: Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2ddaabb5f Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)
2ddaabb5f is described below
commit 2ddaabb5f060b788a8d65197733be11e2316aa57
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 16:26:45 2022 +0100
Make SortPreservingMerge Usable Outside Tokio (#2201) (#2305)
* Make SortPreservingMerge Usable Outside Tokio (#2201)
* Workaround async_trait lifetime shenanigans
---
.../physical_plan/sorts/sort_preserving_merge.rs | 59 ++++++++++++++--------
1 file changed, 38 insertions(+), 21 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 6b8367a53..37ed4acb8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -187,27 +187,44 @@ impl ExecutionPlan for SortPreservingMergeExec {
result
}
_ => {
- let receivers = (0..input_partitions)
- .into_iter()
- .map(|part_i| {
- let (sender, receiver) = mpsc::channel(1);
- let join_handle = spawn_execution(
- self.input.clone(),
- sender,
- part_i,
- context.clone(),
- );
-
- SortedStream::new(
- RecordBatchReceiverStream::create(
- &schema,
- receiver,
- join_handle,
- ),
- 0,
- )
- })
- .collect();
+ // Use tokio only if running from a tokio context (#2201)
+ let receivers = match tokio::runtime::Handle::try_current() {
+ Ok(_) => (0..input_partitions)
+ .into_iter()
+ .map(|part_i| {
+ let (sender, receiver) = mpsc::channel(1);
+ let join_handle = spawn_execution(
+ self.input.clone(),
+ sender,
+ part_i,
+ context.clone(),
+ );
+
+ SortedStream::new(
+ RecordBatchReceiverStream::create(
+ &schema,
+ receiver,
+ join_handle,
+ ),
+ 0,
+ )
+ })
+ .collect(),
+ Err(_) => {
+ futures::future::try_join_all((0..input_partitions).map(
+ |partition| {
+ let context = context.clone();
+ async move {
+ self.input
+ .execute(partition, context)
+ .await
+ .map(|stream| SortedStream::new(stream, 0))
+ }
+ },
+ ))
+ .await?
+ }
+ };
debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");