You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/21 14:21:22 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8730: ARROW-10673: [Rust] [DataFusion] Made sort not collect on `execute`.

alamb commented on a change in pull request #8730:
URL: https://github.com/apache/arrow/pull/8730#discussion_r528201530



##########
File path: rust/datafusion/src/physical_plan/sort.rs
##########
@@ -114,60 +121,128 @@ impl ExecutionPlan for SortExec {
                 "SortExec requires a single input partition".to_owned(),
             ));
         }
-        let it = self.input.execute(0).await?;
-        let batches = common::collect(it).await?;
-
-        // combine all record batches into one for each column
-        let combined_batch = RecordBatch::try_new(
-            self.schema(),
-            self.schema()
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| -> Result<ArrayRef> {
-                    Ok(concat(
-                        &batches
-                            .iter()
-                            .map(|batch| batch.columns()[i].clone())
-                            .collect::<Vec<ArrayRef>>(),
-                    )?)
-                })
-                .collect::<Result<Vec<ArrayRef>>>()?,
-        )?;
+        let input = self.input.execute(0).await?;
 
-        // sort combined record batch
-        let indices = lexsort_to_indices(
-            &self
-                .expr
-                .iter()
-                .map(|e| e.evaluate_to_sort_column(&combined_batch))
-                .collect::<Result<Vec<SortColumn>>>()?,
-        )?;
+        Ok(Box::pin(SortStream::new(input, self.expr.clone())))
+    }
+}
 
-        // reorder all rows based on sorted indices
-        let sorted_batch = RecordBatch::try_new(
-            self.schema(),
-            combined_batch
-                .columns()
-                .iter()
-                .map(|column| -> Result<ArrayRef> {
-                    Ok(take(
-                        column,
-                        &indices,
-                        // disable bound check overhead since indices are already generated from
-                        // the same record batch
-                        Some(TakeOptions {
-                            check_bounds: false,
-                        }),
-                    )?)
-                })
-                .collect::<Result<Vec<ArrayRef>>>()?,
-        )?;
+fn sort_batches(
+    batches: &Vec<RecordBatch>,
+    schema: &SchemaRef,
+    expr: &[PhysicalSortExpr],
+) -> ArrowResult<RecordBatch> {
+    // combine all record batches into one for each column
+    let combined_batch = RecordBatch::try_new(
+        schema.clone(),
+        schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, _)| {
+                concat(
+                    &batches
+                        .iter()
+                        .map(|batch| batch.columns()[i].clone())
+                        .collect::<Vec<ArrayRef>>(),
+                )
+            })
+            .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+    )?;
+
+    // sort combined record batch
+    let indices = lexsort_to_indices(
+        &expr
+            .iter()
+            .map(|e| e.evaluate_to_sort_column(&combined_batch))
+            .collect::<Result<Vec<SortColumn>>>()
+            .map_err(DataFusionError::into_arrow_external_error)?,
+    )?;
+
+    // reorder all rows based on sorted indices
+    RecordBatch::try_new(
+        schema.clone(),
+        combined_batch
+            .columns()
+            .iter()
+            .map(|column| {
+                take(
+                    column,
+                    &indices,
+                    // disable bound check overhead since indices are already generated from
+                    // the same record batch
+                    Some(TakeOptions {
+                        check_bounds: false,
+                    }),
+                )
+            })
+            .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+    )
+}
+
+pin_project! {
+    struct SortStream {
+        #[pin]
+        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        finished: bool,
+        schema: SchemaRef,
+    }
+}
+
+impl SortStream {
+    fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) -> Self {
+        let (tx, rx) = futures::channel::oneshot::channel();
+
+        let schema = input.schema();
+        tokio::spawn(async move {
+            let schema = input.schema();
+            let sorted_batch = common::collect(input)

Review comment:
       this is good -- it also only starts fetching the input when a call to `execute` is made (aka when the parent operator is read for data from the sort). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org