You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/11/21 23:07:33 UTC

[arrow] branch master updated: ARROW-10673: [Rust] [DataFusion] Made sort not collect on `execute`.

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 37c02c1  ARROW-10673: [Rust] [DataFusion] Made sort not collect on `execute`.
37c02c1 is described below

commit 37c02c1b3d99e713430d30e7cbdbb7a599530fb9
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Nov 21 18:06:51 2020 -0500

    ARROW-10673: [Rust] [DataFusion] Made sort not collect on `execute`.
    
    Currently, we `collect` and `sort` the record batches from the incoming part of `Sort` on `execute`. However, there is no need to do so: we can postpone that to the stream, like we do for our aggregates. This allow executors to postpone the heavy operation of collecting and sorting to only when they want the first (and only) batch from the stream.
    
    This PR does exactly this: it postpones the execution to when needed.
    
    This code's design is based on HashAggregate, that also uses a one shot channel.
    
    Closes #8730 from jorgecarleitao/sort_post
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/datafusion/src/physical_plan/sort.rs | 183 +++++++++++++++++++++---------
 1 file changed, 129 insertions(+), 54 deletions(-)

diff --git a/rust/datafusion/src/physical_plan/sort.rs b/rust/datafusion/src/physical_plan/sort.rs
index 0c3601a..c734e2a 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -18,17 +18,24 @@
 //! Defines the SORT plan
 
 use std::any::Any;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::stream::Stream;
+use futures::Future;
+
+use pin_project_lite::pin_project;
 
-use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+use arrow::{array::ArrayRef, error::ArrowError};
 
-use super::SendableRecordBatchStream;
+use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::common::SizedRecordBatchStream;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};
 
@@ -114,60 +121,128 @@ impl ExecutionPlan for SortExec {
                 "SortExec requires a single input partition".to_owned(),
             ));
         }
-        let it = self.input.execute(0).await?;
-        let batches = common::collect(it).await?;
-
-        // combine all record batches into one for each column
-        let combined_batch = RecordBatch::try_new(
-            self.schema(),
-            self.schema()
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| -> Result<ArrayRef> {
-                    Ok(concat(
-                        &batches
-                            .iter()
-                            .map(|batch| batch.columns()[i].clone())
-                            .collect::<Vec<ArrayRef>>(),
-                    )?)
-                })
-                .collect::<Result<Vec<ArrayRef>>>()?,
-        )?;
+        let input = self.input.execute(0).await?;
 
-        // sort combined record batch
-        let indices = lexsort_to_indices(
-            &self
-                .expr
-                .iter()
-                .map(|e| e.evaluate_to_sort_column(&combined_batch))
-                .collect::<Result<Vec<SortColumn>>>()?,
-        )?;
+        Ok(Box::pin(SortStream::new(input, self.expr.clone())))
+    }
+}
 
-        // reorder all rows based on sorted indices
-        let sorted_batch = RecordBatch::try_new(
-            self.schema(),
-            combined_batch
-                .columns()
-                .iter()
-                .map(|column| -> Result<ArrayRef> {
-                    Ok(take(
-                        column,
-                        &indices,
-                        // disable bound check overhead since indices are already generated from
-                        // the same record batch
-                        Some(TakeOptions {
-                            check_bounds: false,
-                        }),
-                    )?)
-                })
-                .collect::<Result<Vec<ArrayRef>>>()?,
-        )?;
+fn sort_batches(
+    batches: &Vec<RecordBatch>,
+    schema: &SchemaRef,
+    expr: &[PhysicalSortExpr],
+) -> ArrowResult<RecordBatch> {
+    // combine all record batches into one for each column
+    let combined_batch = RecordBatch::try_new(
+        schema.clone(),
+        schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, _)| {
+                concat(
+                    &batches
+                        .iter()
+                        .map(|batch| batch.columns()[i].clone())
+                        .collect::<Vec<ArrayRef>>(),
+                )
+            })
+            .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+    )?;
+
+    // sort combined record batch
+    let indices = lexsort_to_indices(
+        &expr
+            .iter()
+            .map(|e| e.evaluate_to_sort_column(&combined_batch))
+            .collect::<Result<Vec<SortColumn>>>()
+            .map_err(DataFusionError::into_arrow_external_error)?,
+    )?;
+
+    // reorder all rows based on sorted indices
+    RecordBatch::try_new(
+        schema.clone(),
+        combined_batch
+            .columns()
+            .iter()
+            .map(|column| {
+                take(
+                    column,
+                    &indices,
+                    // disable bound check overhead since indices are already generated from
+                    // the same record batch
+                    Some(TakeOptions {
+                        check_bounds: false,
+                    }),
+                )
+            })
+            .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+    )
+}
+
+pin_project! {
+    struct SortStream {
+        #[pin]
+        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        finished: bool,
+        schema: SchemaRef,
+    }
+}
+
+impl SortStream {
+    fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) -> Self {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        let schema = input.schema();
+        tokio::spawn(async move {
+            let schema = input.schema();
+            let sorted_batch = common::collect(input)
+                .await
+                .map_err(DataFusionError::into_arrow_external_error)
+                .and_then(move |batches| sort_batches(&batches, &schema, &expr));
+
+            tx.send(sorted_batch)
+        });
+
+        Self {
+            output: rx,
+            finished: false,
+            schema,
+        }
+    }
+}
+
+impl Stream for SortStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if self.finished {
+            return Poll::Ready(None);
+        }
+
+        // is the output ready?
+        let this = self.project();
+        let output_poll = this.output.poll(cx);
 
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            self.schema(),
-            vec![Arc::new(sorted_batch)],
-        )))
+        match output_poll {
+            Poll::Ready(result) => {
+                *this.finished = true;
+
+                // check for error in receiving channel and unwrap actual result
+                let result = match result {
+                    Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
+                    Ok(result) => result,
+                };
+                Poll::Ready(Some(result))
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl RecordBatchStream for SortStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
     }
 }