You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "comphead (via GitHub)" <gi...@apache.org> on 2023/04/30 17:14:44 UTC

[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #6163: Adaptive in-memory sort (~2x faster) (#5879)

comphead commented on code in PR #6163:
URL: https://github.com/apache/arrow-datafusion/pull/6163#discussion_r1181264479


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {
-    /// Current logical position in the iterator
-    pos: usize,
-    /// Sorted composite index of where to find the rows in buffered batches
-    composite: Vec<CompositeIndex>,
-    /// Maximum batch size to produce
-    batch_size: usize,
+    Ok(RecordBatch::try_new(batch.schema(), columns)?)
 }
 
-impl SortedIterator {
-    fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
-        Self {
-            pos: 0,
-            composite,
-            batch_size,
-        }
-    }
-
-    fn memory_size(&self) -> usize {
-        std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
-    }
-}
-
-impl Iterator for SortedIterator {
-    type Item = Vec<CompositeSlice>;
-
-    /// Emit a max of `batch_size` positions each time
-    fn next(&mut self) -> Option<Self::Item> {
-        let length = self.composite.len();
-        if self.pos >= length {
-            return None;
-        }
-
-        let current_size = min(self.batch_size, length - self.pos);
-
-        // Combine adjacent indexes from the same batch to make a slice,
-        // for more efficient `extend` later.
-        let mut last_batch_idx = self.composite[self.pos].batch_idx;
-        let mut indices_in_batch = Vec::with_capacity(current_size);
-
-        let mut slices = vec![];
-        for ci in &self.composite[self.pos..self.pos + current_size] {
-            if ci.batch_idx != last_batch_idx {
-                group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-                last_batch_idx = ci.batch_idx;
-            }
-            indices_in_batch.push(ci.row_idx);
-        }
-
-        assert!(
-            !indices_in_batch.is_empty(),
-            "There should have at least one record in a sort output slice."
-        );
-        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
-        self.pos += current_size;
-        Some(slices)
-    }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
-    batch_idx: u32,
-    positions: &mut Vec<u32>,
-    output: &mut Vec<CompositeSlice>,
-) {
-    positions.sort_unstable();
-    let mut last_pos = 0;
-    let mut run_length = 0;
-    for pos in positions.iter() {
-        if run_length == 0 {
-            last_pos = *pos;
-            run_length = 1;
-        } else if *pos == last_pos + 1 {
-            run_length += 1;
-            last_pos = *pos;
-        } else {
-            output.push(CompositeSlice {
-                batch_idx,
-                start_row_idx: last_pos + 1 - run_length,
-                len: run_length as usize,
-            });
-            last_pos = *pos;
-            run_length = 1;
-        }
-    }
-    assert!(
-        run_length > 0,
-        "There should have at least one record in a sort output slice."
-    );
-    output.push(CompositeSlice {
-        batch_idx,
-        start_row_idx: last_pos + 1 - run_length,
-        len: run_length as usize,
-    });
-    positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
-    schema: SchemaRef,
+async fn spill_sorted_batches(
     batches: Vec<RecordBatch>,

Review Comment:
   just for curiosity, why do you prefer vector instead of stream here? to avoid channels overheads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org