You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 20:51:16 UTC

[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #2132: Reduce SortExec memory usage by void constructing single huge batch

Dandandan commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842129926


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined mono batch,
+                    // component record batches won't overflow as well,
+                    // use u32 here for space efficiency.
+                    batch_idx: i as u32,
+                    row_idx: r as u32,
+                })
+                .collect::<Vec<CompositeIndex>>()

Review Comment:
   Doing a collect here while flattening it later seems doing some extra allocations. I think we should be able to generate this in one go?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org