You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/21 20:31:24 UTC
[arrow-datafusion] branch master updated: Don't sort batches during plan (#2312)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 54b15f690 Don't sort batches during plan (#2312)
54b15f690 is described below
commit 54b15f690cf6b5e855b16f7be1cfbc655b1c202f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 21 21:31:20 2022 +0100
Don't sort batches during plan (#2312)
* Don't sort batches during plan
* Use BoxStream instead
* Eliminate double boxing of stream
---
datafusion/core/src/physical_plan/sorts/sort.rs | 31 ++++++++-------
datafusion/core/src/physical_plan/stream.rs | 50 +++++++++++++++++++++++++
2 files changed, 67 insertions(+), 14 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 8edd20a8b..b6e662a6e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -32,7 +32,7 @@ use crate::physical_plan::metrics::{
};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
use crate::physical_plan::sorts::SortedStream;
-use crate::physical_plan::stream::RecordBatchReceiverStream;
+use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::physical_plan::{
DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -42,12 +42,12 @@ use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
pub use arrow::compute::SortOptions;
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
-use arrow::error::Result as ArrowResult;
+use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::lock::Mutex;
-use futures::{Stream, StreamExt};
+use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use log::{debug, error};
use std::any::Any;
use std::cmp::min;
@@ -779,17 +779,20 @@ impl ExecutionPlan for SortExec {
debug!("End SortExec's input.execute for partition: {}", partition);
- let result = do_sort(
- input,
- partition,
- self.expr.clone(),
- self.metrics_set.clone(),
- context,
- )
- .await;
-
- debug!("End SortExec::execute for partition {}", partition);
- result
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ self.schema(),
+ futures::stream::once(
+ do_sort(
+ input,
+ partition,
+ self.expr.clone(),
+ self.metrics_set.clone(),
+ context,
+ )
+ .map_err(|e| ArrowError::ExternalError(Box::new(e))),
+ )
+ .try_flatten(),
+ )))
}
fn metrics(&self) -> Option<MetricsSet> {
diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs
index 67b709040..99209121f 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -21,6 +21,7 @@ use arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use futures::{Stream, StreamExt};
+use pin_project_lite::pin_project;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
@@ -73,3 +74,52 @@ impl RecordBatchStream for RecordBatchReceiverStream {
self.schema.clone()
}
}
+
+pin_project! {
+ /// Combines a [`Stream`] with a [`SchemaRef`] implementing
+ /// [`RecordBatchStream`] for the combination
+ pub(crate) struct RecordBatchStreamAdapter<S> {
+ schema: SchemaRef,
+
+ #[pin]
+ stream: S,
+ }
+}
+
+impl<S> RecordBatchStreamAdapter<S> {
+ /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
+ pub(crate) fn new(schema: SchemaRef, stream: S) -> Self {
+ Self { schema, stream }
+ }
+}
+
+impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RecordBatchStreamAdapter")
+ .field("schema", &self.schema)
+ .finish()
+ }
+}
+
+impl<S> Stream for RecordBatchStreamAdapter<S>
+where
+ S: Stream<Item = ArrowResult<RecordBatch>>,
+{
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.project().stream.poll_next(cx)
+ }
+}
+
+impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
+where
+ S: Stream<Item = ArrowResult<RecordBatch>>,
+{
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}