You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 20:31:24 UTC

[arrow-datafusion] branch master updated: Don't sort batches during plan (#2312)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 54b15f690 Don't sort batches during plan (#2312)
54b15f690 is described below

commit 54b15f690cf6b5e855b16f7be1cfbc655b1c202f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 21:31:20 2022 +0100

    Don't sort batches during plan (#2312)
    
    * Don't sort batches during plan
    
    * Use BoxStream instead
    
    * Eliminate double boxing of stream
---
 datafusion/core/src/physical_plan/sorts/sort.rs | 31 ++++++++-------
 datafusion/core/src/physical_plan/stream.rs     | 50 +++++++++++++++++++++++++
 2 files changed, 67 insertions(+), 14 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 8edd20a8b..b6e662a6e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -32,7 +32,7 @@ use crate::physical_plan::metrics::{
 };
 use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
 use crate::physical_plan::sorts::SortedStream;
-use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
 use crate::physical_plan::{
     DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -42,12 +42,12 @@ use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
 pub use arrow::compute::SortOptions;
 use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
-use arrow::error::Result as ArrowResult;
+use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 use futures::lock::Mutex;
-use futures::{Stream, StreamExt};
+use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
 use log::{debug, error};
 use std::any::Any;
 use std::cmp::min;
@@ -779,17 +779,20 @@ impl ExecutionPlan for SortExec {
 
         debug!("End SortExec's input.execute for partition: {}", partition);
 
-        let result = do_sort(
-            input,
-            partition,
-            self.expr.clone(),
-            self.metrics_set.clone(),
-            context,
-        )
-        .await;
-
-        debug!("End SortExec::execute for partition {}", partition);
-        result
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            futures::stream::once(
+                do_sort(
+                    input,
+                    partition,
+                    self.expr.clone(),
+                    self.metrics_set.clone(),
+                    context,
+                )
+                .map_err(|e| ArrowError::ExternalError(Box::new(e))),
+            )
+            .try_flatten(),
+        )))
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs
index 67b709040..99209121f 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -21,6 +21,7 @@ use arrow::{
     datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
 };
 use futures::{Stream, StreamExt};
+use pin_project_lite::pin_project;
 use tokio::task::JoinHandle;
 use tokio_stream::wrappers::ReceiverStream;
 
@@ -73,3 +74,52 @@ impl RecordBatchStream for RecordBatchReceiverStream {
         self.schema.clone()
     }
 }
+
+pin_project! {
+    /// Combines a [`Stream`] with a [`SchemaRef`] implementing
+    /// [`RecordBatchStream`] for the combination
+    pub(crate) struct RecordBatchStreamAdapter<S> {
+        schema: SchemaRef,
+
+        #[pin]
+        stream: S,
+    }
+}
+
+impl<S> RecordBatchStreamAdapter<S> {
+    /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
+    pub(crate) fn new(schema: SchemaRef, stream: S) -> Self {
+        Self { schema, stream }
+    }
+}
+
+impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RecordBatchStreamAdapter")
+            .field("schema", &self.schema)
+            .finish()
+    }
+}
+
+impl<S> Stream for RecordBatchStreamAdapter<S>
+where
+    S: Stream<Item = ArrowResult<RecordBatch>>,
+{
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.project().stream.poll_next(cx)
+    }
+}
+
+impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
+where
+    S: Stream<Item = ArrowResult<RecordBatch>>,
+{
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}