You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/04/03 16:46:50 UTC

[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #5851: Use SortPreservingMerge for in memory sort

tustvold commented on code in PR #5851:
URL: https://github.com/apache/arrow-datafusion/pull/5851#discussion_r1156209690


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,279 +276,42 @@ impl Debug for ExternalSorter {
 
 /// consume the non-empty `sorted_batches` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
+    buffered_batches: &mut Vec<RecordBatch>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
     batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
-    fetch: Option<usize>,
+    _fetch: Option<usize>,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
+    if buffered_batches.len() < 2 {
+        let batches: Vec<_> = buffered_batches.drain(..).collect();

Review Comment:
   It is worth highlighting why this is important to the benchmarks, as there was much discussion of this on https://github.com/apache/arrow-datafusion/issues/5230
   
   The way the "preserve partitioning" benchmarks are setup is they yield a single RecordBatch per partition, they're effectively a special case where the in_mem_partial_sort is a no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org