You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/05 00:54:52 UTC

[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2132: Reduce SortExec memory usage by void constructing single huge batch

yjshen commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842248683


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -271,33 +291,212 @@ fn in_mem_partial_sort(
     buffered_batches: &mut Vec<RecordBatch>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap())],
+            tracking_metrics,
+        )))
+    } else {
+        let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&batches, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+fn get_sorted_iter(

Review Comment:
   I think in both cases (for this PR and the current master), we need to concat sort columns before the current `lexsort`. For the current master, the concat is done while constructing the single huge record batch. 
   
   For `select a, b from table order by a,b`, we consume memory with `Vec<CompositeIndex>` in this PR, but also avoid `take` huge arrays that do the actual reorder. So I think this behavior is consistent in this PR for different cases that sort columns and payload columns vary? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org