You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/02 13:20:22 UTC
[arrow-datafusion] branch main updated: Adaptive in-memory sort (~2x faster) (#5879) (#6163)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9a0ab5f37f Adaptive in-memory sort (~2x faster) (#5879) (#6163)
9a0ab5f37f is described below
commit 9a0ab5f37f8528f8da057b3c211cdbd4dc609a33
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 2 14:20:17 2023 +0100
Adaptive in-memory sort (~2x faster) (#5879) (#6163)
* Adaptive in-memory sort (#5879)
* Clippy
* Fix test
* Clippy
* Update test_source_sorted_builtin
---
datafusion/core/src/physical_plan/sorts/sort.rs | 568 +++++++-----------------
datafusion/core/tests/order_spill_fuzz.rs | 2 +-
datafusion/core/tests/sql/window.rs | 2 +-
3 files changed, 151 insertions(+), 421 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 5f644b658b..9461f9689e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -25,7 +25,7 @@ use crate::execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
+use crate::physical_plan::common::{batch_byte_size, IPCWriter};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet,
@@ -34,28 +34,24 @@ use crate::physical_plan::sorts::merge::streaming_merge;
use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::physical_plan::{
DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionConfig;
-use arrow::array::{make_array, Array, ArrayRef, MutableArrayData};
pub use arrow::compute::SortOptions;
-use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
+use arrow::compute::{concat_batches, lexsort_to_indices, take};
use arrow::datatypes::SchemaRef;
-use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::EquivalenceProperties;
-use futures::{Stream, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
use log::{debug, error};
use std::any::Any;
-use std::cmp::{min, Ordering};
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use std::task::{Context, Poll};
use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
@@ -71,10 +67,11 @@ use tokio::task;
/// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
struct ExternalSorter {
schema: SchemaRef,
- in_mem_batches: Vec<BatchWithSortArray>,
+ in_mem_batches: Vec<RecordBatch>,
+ in_mem_batches_sorted: bool,
spills: Vec<NamedTempFile>,
/// Sort expressions
- expr: Vec<PhysicalSortExpr>,
+ expr: Arc<[PhysicalSortExpr]>,
session_config: Arc<SessionConfig>,
runtime: Arc<RuntimeEnv>,
metrics_set: CompositeMetricsSet,
@@ -103,8 +100,9 @@ impl ExternalSorter {
Self {
schema,
in_mem_batches: vec![],
+ in_mem_batches_sorted: true,
spills: vec![],
- expr,
+ expr: expr.into(),
session_config,
runtime,
metrics_set,
@@ -115,47 +113,37 @@ impl ExternalSorter {
}
}
- async fn insert_batch(
- &mut self,
- input: RecordBatch,
- tracking_metrics: &MemTrackingMetrics,
- ) -> Result<()> {
- if input.num_rows() > 0 {
- let size = batch_byte_size(&input);
- if self.reservation.try_grow(size).is_err() {
+ /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
+ ///
+ /// Updates memory usage metrics, and possibly triggers spilling to disk
+ async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+ if input.num_rows() == 0 {
+ return Ok(());
+ }
+
+ let size = batch_byte_size(&input);
+ if self.reservation.try_grow(size).is_err() {
+ let before = self.reservation.size();
+ self.in_mem_sort().await?;
+ // Sorting may have freed memory, especially if fetch is not `None`
+ //
+ // As such we check again, and if the memory usage has dropped by
+ // a factor of 2, and we can allocate the necessary capacity,
+ // we don't spill
+ //
+ // The factor of 2 aims to avoid a degenerate case where the
+ // memory required for `fetch` is just under the memory available,
+ // causing repeated resorting of data
+ if self.reservation.size() > before / 2
+ || self.reservation.try_grow(size).is_err()
+ {
self.spill().await?;
self.reservation.try_grow(size)?
}
-
- self.metrics.mem_used().add(size);
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
- let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?;
-
- // The resulting batch might be smaller (or larger, see #3747) than the input
- // batch due to either a propagated limit or the re-construction of arrays. So
- // for being reliable, we need to reflect the memory usage of the partial batch.
- let new_size = batch_byte_size(&partial.sorted_batch);
- match new_size.cmp(&size) {
- Ordering::Greater => {
- // We don't have to call try_grow here, since we have already used the
- // memory (so spilling right here wouldn't help at all for the current
- // operation). But we still have to record it so that other requesters
- // would know about this unexpected increase in memory consumption.
- let new_size_delta = new_size - size;
- self.reservation.grow(new_size_delta);
- self.metrics.mem_used().add(new_size_delta);
- }
- Ordering::Less => {
- let size_delta = size - new_size;
- self.reservation.shrink(size_delta);
- self.metrics.mem_used().sub(size_delta);
- }
- Ordering::Equal => {}
- }
- self.in_mem_batches.push(partial);
}
+ self.metrics.mem_used().add(size);
+ self.in_mem_batches.push(input);
+ self.in_mem_batches_sorted = false;
Ok(())
}
@@ -165,28 +153,18 @@ impl ExternalSorter {
/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
fn sort(&mut self) -> Result<SendableRecordBatchStream> {
- let batch_size = self.session_config.batch_size();
-
if self.spilled_before() {
let intermediate_metrics = self
.metrics_set
.new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
- let mut merge_metrics = self
+
+ let merge_metrics = self
.metrics_set
.new_final_tracking(self.partition_id, &self.runtime.memory_pool);
let mut streams = vec![];
if !self.in_mem_batches.is_empty() {
- let in_mem_stream = in_mem_partial_sort(
- &mut self.in_mem_batches,
- self.schema.clone(),
- &self.expr,
- batch_size,
- intermediate_metrics,
- self.fetch,
- )?;
- // TODO: More accurate, dynamic memory accounting (#5885)
- merge_metrics.init_mem_used(self.reservation.free());
+ let in_mem_stream = self.in_mem_sort_stream(intermediate_metrics)?;
streams.push(in_mem_stream);
}
@@ -206,14 +184,7 @@ impl ExternalSorter {
let tracking_metrics = self
.metrics_set
.new_final_tracking(self.partition_id, &self.runtime.memory_pool);
- let result = in_mem_partial_sort(
- &mut self.in_mem_batches,
- self.schema.clone(),
- &self.expr,
- batch_size,
- tracking_metrics,
- self.fetch,
- );
+ let result = self.in_mem_sort_stream(tracking_metrics);
// Report to the memory manager we are no longer using memory
self.reservation.free();
result
@@ -242,28 +213,92 @@ impl ExternalSorter {
debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
- let tracking_metrics = self
- .metrics_set
- .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+ self.in_mem_sort().await?;
let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
- let stream = in_mem_partial_sort(
- &mut self.in_mem_batches,
- self.schema.clone(),
- &self.expr,
- self.session_config.batch_size(),
- tracking_metrics,
- self.fetch,
- );
-
- spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
- .await?;
+ let batches = std::mem::take(&mut self.in_mem_batches);
+ spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
self.reservation.free();
let used = self.metrics.mem_used().set(0);
self.metrics.record_spill(used);
self.spills.push(spillfile);
Ok(used)
}
+
+ /// Sorts the in_mem_batches in place
+ async fn in_mem_sort(&mut self) -> Result<()> {
+ if self.in_mem_batches_sorted {
+ return Ok(());
+ }
+
+ let tracking_metrics = self
+ .metrics_set
+ .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+ self.in_mem_batches = self
+ .in_mem_sort_stream(tracking_metrics)?
+ .try_collect()
+ .await?;
+
+ let size: usize = self
+ .in_mem_batches
+ .iter()
+ .map(|x| x.get_array_memory_size())
+ .sum();
+
+ self.metrics.mem_used().set(size);
+ self.reservation.resize(size);
+ self.in_mem_batches_sorted = true;
+ Ok(())
+ }
+
+ /// Consumes in_mem_batches returning a sorted stream
+ fn in_mem_sort_stream(
+ &mut self,
+ metrics: MemTrackingMetrics,
+ ) -> Result<SendableRecordBatchStream> {
+ assert_ne!(self.in_mem_batches.len(), 0);
+ if self.in_mem_batches.len() == 1 {
+ let batch = self.in_mem_batches.remove(0);
+ let stream =
+ sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+ self.in_mem_batches.clear();
+ return Ok(stream);
+ }
+
+ // If less than 1MB of in-memory data, concatenate and sort in place
+ //
+ // This is a very rough heuristic and likely could be refined further
+ if self.reservation.size() < 1048576 {
+ // Concatenate memory batches together and sort
+ let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+ self.in_mem_batches.clear();
+ return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+ }
+
+ let streams = self
+ .in_mem_batches
+ .drain(..)
+ .map(|batch| {
+ let metrics = self.metrics_set.new_intermediate_tracking(
+ self.partition_id,
+ &self.runtime.memory_pool,
+ );
+ sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
+ })
+ .collect::<Result<_>>()?;
+
+ // TODO: Run batch sorts concurrently (#6162)
+ // TODO: Pushdown fetch to streaming merge (#6000)
+
+ streaming_merge(
+ streams,
+ self.schema.clone(),
+ &self.expr,
+ metrics,
+ self.session_config.batch_size(),
+ )
+ }
}
impl Debug for ExternalSorter {
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
}
}
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
- buffered_batches: &mut Vec<BatchWithSortArray>,
- schema: SchemaRef,
- expressions: &[PhysicalSortExpr],
- batch_size: usize,
- tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+ batch: RecordBatch,
+ expressions: Arc<[PhysicalSortExpr]>,
fetch: Option<usize>,
+ mut tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
- assert_ne!(buffered_batches.len(), 0);
- if buffered_batches.len() == 1 {
- let result = buffered_batches.pop();
- Ok(Box::pin(SizedRecordBatchStream::new(
- schema,
- vec![Arc::new(result.unwrap().sorted_batch)],
- tracking_metrics,
- )))
- } else {
- let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
- buffered_batches
- .drain(..)
- .map(|b| {
- let BatchWithSortArray {
- sort_arrays,
- sorted_batch: batch,
- } = b;
- (sort_arrays, batch)
- })
- .unzip();
-
- let sorted_iter = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
- get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
- };
- Ok(Box::pin(SortedSizedRecordBatchStream::new(
- schema,
- batches,
- sorted_iter,
- tracking_metrics,
- )))
- }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
- batch_idx: u32,
- row_idx: u32,
+ let schema = batch.schema();
+ tracking_metrics.init_mem_used(batch.get_array_memory_size());
+ let stream = futures::stream::once(futures::future::lazy(move |_| {
+ let sorted = sort_batch(&batch, &expressions, fetch)?;
+ tracking_metrics.record_output(sorted.num_rows());
+ Ok(sorted)
+ }));
+ Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
- sort_arrays: &[Vec<ArrayRef>],
- expr: &[PhysicalSortExpr],
- batch_size: usize,
+fn sort_batch(
+ batch: &RecordBatch,
+ expressions: &[PhysicalSortExpr],
fetch: Option<usize>,
-) -> Result<SortedIterator> {
- let row_indices = sort_arrays
- .iter()
- .enumerate()
- .flat_map(|(i, arrays)| {
- (0..arrays[0].len()).map(move |r| CompositeIndex {
- // since we original use UInt32Array to index the combined mono batch,
- // component record batches won't overflow as well,
- // use u32 here for space efficiency.
- batch_idx: i as u32,
- row_idx: r as u32,
- })
- })
- .collect::<Vec<CompositeIndex>>();
-
- let sort_columns = expr
+) -> Result<RecordBatch> {
+ let sort_columns = expressions
.iter()
- .enumerate()
- .map(|(i, expr)| {
- let columns_i = sort_arrays
- .iter()
- .map(|cs| cs[i].as_ref())
- .collect::<Vec<&dyn Array>>();
- Ok(SortColumn {
- values: concat(columns_i.as_slice())?,
- options: Some(expr.options),
- })
- })
+ .map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()?;
+
let indices = lexsort_to_indices(&sort_columns, fetch)?;
- // Calculate composite index based on sorted indices
- let row_indices = indices
- .values()
+ let columns = batch
+ .columns()
.iter()
- .map(|i| row_indices[*i as usize])
- .collect();
-
- Ok(SortedIterator::new(row_indices, batch_size))
-}
+ .map(|c| take(c.as_ref(), &indices, None))
+ .collect::<Result<_, _>>()?;
-struct SortedIterator {
- /// Current logical position in the iterator
- pos: usize,
- /// Sorted composite index of where to find the rows in buffered batches
- composite: Vec<CompositeIndex>,
- /// Maximum batch size to produce
- batch_size: usize,
+ Ok(RecordBatch::try_new(batch.schema(), columns)?)
}
-impl SortedIterator {
- fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
- Self {
- pos: 0,
- composite,
- batch_size,
- }
- }
-
- fn memory_size(&self) -> usize {
- std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
- }
-}
-
-impl Iterator for SortedIterator {
- type Item = Vec<CompositeSlice>;
-
- /// Emit a max of `batch_size` positions each time
- fn next(&mut self) -> Option<Self::Item> {
- let length = self.composite.len();
- if self.pos >= length {
- return None;
- }
-
- let current_size = min(self.batch_size, length - self.pos);
-
- // Combine adjacent indexes from the same batch to make a slice,
- // for more efficient `extend` later.
- let mut last_batch_idx = self.composite[self.pos].batch_idx;
- let mut indices_in_batch = Vec::with_capacity(current_size);
-
- let mut slices = vec![];
- for ci in &self.composite[self.pos..self.pos + current_size] {
- if ci.batch_idx != last_batch_idx {
- group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
- last_batch_idx = ci.batch_idx;
- }
- indices_in_batch.push(ci.row_idx);
- }
-
- assert!(
- !indices_in_batch.is_empty(),
- "There should have at least one record in a sort output slice."
- );
- group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
- self.pos += current_size;
- Some(slices)
- }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
- batch_idx: u32,
- positions: &mut Vec<u32>,
- output: &mut Vec<CompositeSlice>,
-) {
- positions.sort_unstable();
- let mut last_pos = 0;
- let mut run_length = 0;
- for pos in positions.iter() {
- if run_length == 0 {
- last_pos = *pos;
- run_length = 1;
- } else if *pos == last_pos + 1 {
- run_length += 1;
- last_pos = *pos;
- } else {
- output.push(CompositeSlice {
- batch_idx,
- start_row_idx: last_pos + 1 - run_length,
- len: run_length as usize,
- });
- last_pos = *pos;
- run_length = 1;
- }
- }
- assert!(
- run_length > 0,
- "There should have at least one record in a sort output slice."
- );
- output.push(CompositeSlice {
- batch_idx,
- start_row_idx: last_pos + 1 - run_length,
- len: run_length as usize,
- });
- positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
- schema: SchemaRef,
+async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
- sorted_iter: SortedIterator,
- num_cols: usize,
- metrics: MemTrackingMetrics,
-}
-
-impl SortedSizedRecordBatchStream {
- /// new
- pub fn new(
- schema: SchemaRef,
- batches: Vec<RecordBatch>,
- sorted_iter: SortedIterator,
- mut metrics: MemTrackingMetrics,
- ) -> Self {
- let size = batches.iter().map(batch_byte_size).sum::<usize>()
- + sorted_iter.memory_size();
- metrics.init_mem_used(size);
- let num_cols = batches[0].num_columns();
- SortedSizedRecordBatchStream {
- schema,
- batches,
- sorted_iter,
- num_cols,
- metrics,
- }
- }
-}
-
-impl Stream for SortedSizedRecordBatchStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- match self.sorted_iter.next() {
- None => Poll::Ready(None),
- Some(slices) => {
- let num_rows = slices.iter().map(|s| s.len).sum();
- let output = (0..self.num_cols)
- .map(|i| {
- let arrays = self
- .batches
- .iter()
- .map(|b| b.column(i).to_data())
- .collect::<Vec<_>>();
- let arrays = arrays.iter().collect();
- let mut mutable = MutableArrayData::new(arrays, false, num_rows);
- for x in slices.iter() {
- mutable.extend(
- x.batch_idx as usize,
- x.start_row_idx as usize,
- x.start_row_idx as usize + x.len,
- );
- }
- make_array(mutable.freeze())
- })
- .collect::<Vec<_>>();
- let batch =
- RecordBatch::try_new(self.schema.clone(), output).map_err(Into::into);
- let poll = Poll::Ready(Some(batch));
- self.metrics.record_poll(poll)
- }
- }
- }
-}
-
-struct CompositeSlice {
- batch_idx: u32,
- start_row_idx: u32,
- len: usize,
-}
-
-impl RecordBatchStream for SortedSizedRecordBatchStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-async fn spill_partial_sorted_stream(
- in_mem_stream: &mut SendableRecordBatchStream,
path: &Path,
schema: SchemaRef,
) -> Result<()> {
- let (sender, receiver) = tokio::sync::mpsc::channel(2);
let path: PathBuf = path.into();
- let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema));
- while let Some(item) = in_mem_stream.next().await {
- sender.send(item).await.ok();
- }
- drop(sender);
+ let handle = task::spawn_blocking(move || write_sorted(batches, path, schema));
match handle.await {
Ok(r) => r,
Err(e) => Err(DataFusionError::Execution(format!(
@@ -593,13 +382,13 @@ fn read_spill_as_stream(
}
fn write_sorted(
- mut receiver: Receiver<Result<RecordBatch>>,
+ batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<()> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
- while let Some(batch) = receiver.blocking_recv() {
- writer.write(&batch?)?;
+ for batch in batches {
+ writer.write(&batch)?;
}
writer.finish()?;
debug!(
@@ -845,63 +634,6 @@ impl ExecutionPlan for SortExec {
}
}
-struct BatchWithSortArray {
- sort_arrays: Vec<ArrayRef>,
- sorted_batch: RecordBatch,
-}
-
-fn sort_batch(
- batch: RecordBatch,
- schema: SchemaRef,
- expr: &[PhysicalSortExpr],
- fetch: Option<usize>,
-) -> Result<BatchWithSortArray> {
- let sort_columns = expr
- .iter()
- .map(|e| e.evaluate_to_sort_column(&batch))
- .collect::<Result<Vec<SortColumn>>>()?;
-
- let indices = lexsort_to_indices(&sort_columns, fetch)?;
-
- // reorder all rows based on sorted indices
- let sorted_batch = RecordBatch::try_new(
- schema,
- batch
- .columns()
- .iter()
- .map(|column| {
- take(
- column.as_ref(),
- &indices,
- // disable bound check overhead since indices are already generated from
- // the same record batch
- Some(TakeOptions {
- check_bounds: false,
- }),
- )
- })
- .collect::<Result<Vec<ArrayRef>, ArrowError>>()?,
- )?;
-
- let sort_arrays = sort_columns
- .into_iter()
- .map(|sc| {
- Ok(take(
- sc.values.as_ref(),
- &indices,
- Some(TakeOptions {
- check_bounds: false,
- }),
- )?)
- })
- .collect::<Result<Vec<_>>>()?;
-
- Ok(BatchWithSortArray {
- sort_arrays,
- sorted_batch,
- })
-}
-
async fn do_sort(
mut input: SendableRecordBatchStream,
partition_id: usize,
@@ -917,8 +649,6 @@ async fn do_sort(
context.task_id()
);
let schema = input.schema();
- let tracking_metrics =
- metrics_set.new_intermediate_tracking(partition_id, context.memory_pool());
let mut sorter = ExternalSorter::new(
partition_id,
schema.clone(),
@@ -930,7 +660,7 @@ async fn do_sort(
);
while let Some(batch) = input.next().await {
let batch = batch?;
- sorter.insert_batch(batch, &tracking_metrics).await?;
+ sorter.insert_batch(batch).await?;
}
let result = sorter.sort();
debug!(
@@ -1089,7 +819,7 @@ mod tests {
#[tokio::test]
async fn test_sort_fetch_memory_calculation() -> Result<()> {
// This test mirrors down the size from the example above.
- let avg_batch_size = 5000;
+ let avg_batch_size = 4000;
let partitions = 4;
// A tuple of (fetch, expect_spillage)
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index fdf69ec0d1..1f72e0fcb4 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
RecordBatch::try_from_iter(vec![(
"x",
Arc::new(Int32Array::from_iter_values(
- std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+ (0..to_read).map(|_| rng.gen()),
)) as ArrayRef,
)])
.unwrap(),
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 7e7ed031f1..f63395e455 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -205,8 +205,8 @@ mod tests {
"+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
"| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
"+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
- "| 289 | 266 | 305 | 305 | 305 | 278 | 99 | 99 | 99 | 99 | 86 | 86 | 296 | 291 | 296 | 1004 | 305 | 305 | 301 | 296 | 305 | 1002 | 305 | 286 |",
"| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99 | 99 | 86 | 86 | 301 | 296 | 301 | 1004 | 305 | 305 | 301 | 301 | 1001 | 1002 | 1001 | 289 |",
+ "| 289 | 266 | 305 | 305 | 305 | 278 | 99 | 99 | 99 | 99 | 86 | 86 | 296 | 291 | 296 | 1004 | 305 | 305 | 301 | 296 | 305 | 1002 | 305 | 286 |",
"| 289 | 261 | 296 | 301 | | 275 | 98 | 98 | 98 | 98 | 85 | 85 | 291 | 289 | 291 | 1004 | 305 | 305 | 296 | 291 | 301 | 305 | 301 | 283 |",
"| 286 | 259 | 291 | 296 | | 272 | 97 | 97 | 97 | 97 | 84 | 84 | 289 | 286 | 289 | 1004 | 305 | 305 | 291 | 289 | 296 | 301 | 296 | 278 |",
"| 275 | 254 | 289 | 291 | 289 | 269 | 96 | 96 | 96 | 96 | 83 | 83 | 286 | 283 | 286 | 305 | 305 | 305 | 289 | 286 | 291 | 296 | 291 | 275 |",