You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/02 13:20:22 UTC

[arrow-datafusion] branch main updated: Adaptive in-memory sort (~2x faster) (#5879) (#6163)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a0ab5f37f Adaptive in-memory sort (~2x faster) (#5879) (#6163)
9a0ab5f37f is described below

commit 9a0ab5f37f8528f8da057b3c211cdbd4dc609a33
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 2 14:20:17 2023 +0100

    Adaptive in-memory sort (~2x faster) (#5879) (#6163)
    
    * Adaptive in-memory sort (#5879)
    
    * Clippy
    
    * Fix test
    
    * Clippy
    
    * Update test_source_sorted_builtin
---
 datafusion/core/src/physical_plan/sorts/sort.rs | 568 +++++++-----------------
 datafusion/core/tests/order_spill_fuzz.rs       |   2 +-
 datafusion/core/tests/sql/window.rs             |   2 +-
 3 files changed, 151 insertions(+), 421 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 5f644b658b..9461f9689e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -25,7 +25,7 @@ use crate::execution::memory_pool::{
     human_readable_size, MemoryConsumer, MemoryReservation,
 };
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
+use crate::physical_plan::common::{batch_byte_size, IPCWriter};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet,
@@ -34,28 +34,24 @@ use crate::physical_plan::sorts::merge::streaming_merge;
 use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
 use crate::physical_plan::{
     DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
-    RecordBatchStream, SendableRecordBatchStream, Statistics,
+    SendableRecordBatchStream, Statistics,
 };
 use crate::prelude::SessionConfig;
-use arrow::array::{make_array, Array, ArrayRef, MutableArrayData};
 pub use arrow::compute::SortOptions;
-use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
+use arrow::compute::{concat_batches, lexsort_to_indices, take};
 use arrow::datatypes::SchemaRef;
-use arrow::error::ArrowError;
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use datafusion_physical_expr::EquivalenceProperties;
-use futures::{Stream, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
 use log::{debug, error};
 use std::any::Any;
-use std::cmp::{min, Ordering};
 use std::fmt;
 use std::fmt::{Debug, Formatter};
 use std::fs::File;
 use std::io::BufReader;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
-use std::task::{Context, Poll};
 use tempfile::NamedTempFile;
 use tokio::sync::mpsc::{Receiver, Sender};
 use tokio::task;
@@ -71,10 +67,11 @@ use tokio::task;
 /// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
 struct ExternalSorter {
     schema: SchemaRef,
-    in_mem_batches: Vec<BatchWithSortArray>,
+    in_mem_batches: Vec<RecordBatch>,
+    in_mem_batches_sorted: bool,
     spills: Vec<NamedTempFile>,
     /// Sort expressions
-    expr: Vec<PhysicalSortExpr>,
+    expr: Arc<[PhysicalSortExpr]>,
     session_config: Arc<SessionConfig>,
     runtime: Arc<RuntimeEnv>,
     metrics_set: CompositeMetricsSet,
@@ -103,8 +100,9 @@ impl ExternalSorter {
         Self {
             schema,
             in_mem_batches: vec![],
+            in_mem_batches_sorted: true,
             spills: vec![],
-            expr,
+            expr: expr.into(),
             session_config,
             runtime,
             metrics_set,
@@ -115,47 +113,37 @@ impl ExternalSorter {
         }
     }
 
-    async fn insert_batch(
-        &mut self,
-        input: RecordBatch,
-        tracking_metrics: &MemTrackingMetrics,
-    ) -> Result<()> {
-        if input.num_rows() > 0 {
-            let size = batch_byte_size(&input);
-            if self.reservation.try_grow(size).is_err() {
+    /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
+    ///
+    /// Updates memory usage metrics, and possibly triggers spilling to disk
+    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() == 0 {
+            return Ok(());
+        }
+
+        let size = batch_byte_size(&input);
+        if self.reservation.try_grow(size).is_err() {
+            let before = self.reservation.size();
+            self.in_mem_sort().await?;
+            // Sorting may have freed memory, especially if fetch is not `None`
+            //
+            // As such we check again, and if the memory usage has dropped by
+            // a factor of 2, and we can allocate the necessary capacity,
+            // we don't spill
+            //
+            // The factor of 2 aims to avoid a degenerate case where the
+            // memory required for `fetch` is just under the memory available,
+            // causing repeated resorting of data
+            if self.reservation.size() > before / 2
+                || self.reservation.try_grow(size).is_err()
+            {
                 self.spill().await?;
                 self.reservation.try_grow(size)?
             }
-
-            self.metrics.mem_used().add(size);
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?;
-
-            // The resulting batch might be smaller (or larger, see #3747) than the input
-            // batch due to either a propagated limit or the re-construction of arrays. So
-            // for being reliable, we need to reflect the memory usage of the partial batch.
-            let new_size = batch_byte_size(&partial.sorted_batch);
-            match new_size.cmp(&size) {
-                Ordering::Greater => {
-                    // We don't have to call try_grow here, since we have already used the
-                    // memory (so spilling right here wouldn't help at all for the current
-                    // operation). But we still have to record it so that other requesters
-                    // would know about this unexpected increase in memory consumption.
-                    let new_size_delta = new_size - size;
-                    self.reservation.grow(new_size_delta);
-                    self.metrics.mem_used().add(new_size_delta);
-                }
-                Ordering::Less => {
-                    let size_delta = size - new_size;
-                    self.reservation.shrink(size_delta);
-                    self.metrics.mem_used().sub(size_delta);
-                }
-                Ordering::Equal => {}
-            }
-            self.in_mem_batches.push(partial);
         }
+        self.metrics.mem_used().add(size);
+        self.in_mem_batches.push(input);
+        self.in_mem_batches_sorted = false;
         Ok(())
     }
 
@@ -165,28 +153,18 @@ impl ExternalSorter {
 
     /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
     fn sort(&mut self) -> Result<SendableRecordBatchStream> {
-        let batch_size = self.session_config.batch_size();
-
         if self.spilled_before() {
             let intermediate_metrics = self
                 .metrics_set
                 .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
-            let mut merge_metrics = self
+
+            let merge_metrics = self
                 .metrics_set
                 .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
 
             let mut streams = vec![];
             if !self.in_mem_batches.is_empty() {
-                let in_mem_stream = in_mem_partial_sort(
-                    &mut self.in_mem_batches,
-                    self.schema.clone(),
-                    &self.expr,
-                    batch_size,
-                    intermediate_metrics,
-                    self.fetch,
-                )?;
-                // TODO: More accurate, dynamic memory accounting (#5885)
-                merge_metrics.init_mem_used(self.reservation.free());
+                let in_mem_stream = self.in_mem_sort_stream(intermediate_metrics)?;
                 streams.push(in_mem_stream);
             }
 
@@ -206,14 +184,7 @@ impl ExternalSorter {
             let tracking_metrics = self
                 .metrics_set
                 .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
-            let result = in_mem_partial_sort(
-                &mut self.in_mem_batches,
-                self.schema.clone(),
-                &self.expr,
-                batch_size,
-                tracking_metrics,
-                self.fetch,
-            );
+            let result = self.in_mem_sort_stream(tracking_metrics);
             // Report to the memory manager we are no longer using memory
             self.reservation.free();
             result
@@ -242,28 +213,92 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
-        let tracking_metrics = self
-            .metrics_set
-            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+        self.in_mem_sort().await?;
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
-        let stream = in_mem_partial_sort(
-            &mut self.in_mem_batches,
-            self.schema.clone(),
-            &self.expr,
-            self.session_config.batch_size(),
-            tracking_metrics,
-            self.fetch,
-        );
-
-        spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
-            .await?;
+        let batches = std::mem::take(&mut self.in_mem_batches);
+        spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
         self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
         self.spills.push(spillfile);
         Ok(used)
     }
+
+    /// Sorts the in_mem_batches in place
+    async fn in_mem_sort(&mut self) -> Result<()> {
+        if self.in_mem_batches_sorted {
+            return Ok(());
+        }
+
+        let tracking_metrics = self
+            .metrics_set
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
+
+        self.in_mem_batches = self
+            .in_mem_sort_stream(tracking_metrics)?
+            .try_collect()
+            .await?;
+
+        let size: usize = self
+            .in_mem_batches
+            .iter()
+            .map(|x| x.get_array_memory_size())
+            .sum();
+
+        self.metrics.mem_used().set(size);
+        self.reservation.resize(size);
+        self.in_mem_batches_sorted = true;
+        Ok(())
+    }
+
+    /// Consumes in_mem_batches returning a sorted stream
+    fn in_mem_sort_stream(
+        &mut self,
+        metrics: MemTrackingMetrics,
+    ) -> Result<SendableRecordBatchStream> {
+        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.len() == 1 {
+            let batch = self.in_mem_batches.remove(0);
+            let stream =
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)?;
+            self.in_mem_batches.clear();
+            return Ok(stream);
+        }
+
+        // If less than 1MB of in-memory data, concatenate and sort in place
+        //
+        // This is a very rough heuristic and likely could be refined further
+        if self.reservation.size() < 1048576 {
+            // Concatenate memory batches together and sort
+            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            self.in_mem_batches.clear();
+            return sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics);
+        }
+
+        let streams = self
+            .in_mem_batches
+            .drain(..)
+            .map(|batch| {
+                let metrics = self.metrics_set.new_intermediate_tracking(
+                    self.partition_id,
+                    &self.runtime.memory_pool,
+                );
+                sort_batch_stream(batch, self.expr.clone(), self.fetch, metrics)
+            })
+            .collect::<Result<_>>()?;
+
+        // TODO: Run batch sorts concurrently (#6162)
+        // TODO: Pushdown fetch to streaming merge (#6000)
+
+        streaming_merge(
+            streams,
+            self.schema.clone(),
+            &self.expr,
+            metrics,
+            self.session_config.batch_size(),
+        )
+    }
 }
 
 impl Debug for ExternalSorter {
@@ -276,296 +311,50 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// consume the non-empty `sorted_batches` and do in_mem_sort
-fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
-    schema: SchemaRef,
-    expressions: &[PhysicalSortExpr],
-    batch_size: usize,
-    tracking_metrics: MemTrackingMetrics,
+fn sort_batch_stream(
+    batch: RecordBatch,
+    expressions: Arc<[PhysicalSortExpr]>,
     fetch: Option<usize>,
+    mut tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
-    }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct CompositeIndex {
-    batch_idx: u32,
-    row_idx: u32,
+    let schema = batch.schema();
+    tracking_metrics.init_mem_used(batch.get_array_memory_size());
+    let stream = futures::stream::once(futures::future::lazy(move |_| {
+        let sorted = sort_batch(&batch, &expressions, fetch)?;
+        tracking_metrics.record_output(sorted.num_rows());
+        Ok(sorted)
+    }));
+    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
 }
 
-/// Get sorted iterator by sort concatenated `SortColumn`s
-fn get_sorted_iter(
-    sort_arrays: &[Vec<ArrayRef>],
-    expr: &[PhysicalSortExpr],
-    batch_size: usize,
+fn sort_batch(
+    batch: &RecordBatch,
+    expressions: &[PhysicalSortExpr],
     fetch: Option<usize>,
-) -> Result<SortedIterator> {
-    let row_indices = sort_arrays
-        .iter()
-        .enumerate()
-        .flat_map(|(i, arrays)| {
-            (0..arrays[0].len()).map(move |r| CompositeIndex {
-                // since we original use UInt32Array to index the combined mono batch,
-                // component record batches won't overflow as well,
-                // use u32 here for space efficiency.
-                batch_idx: i as u32,
-                row_idx: r as u32,
-            })
-        })
-        .collect::<Vec<CompositeIndex>>();
-
-    let sort_columns = expr
+) -> Result<RecordBatch> {
+    let sort_columns = expressions
         .iter()
-        .enumerate()
-        .map(|(i, expr)| {
-            let columns_i = sort_arrays
-                .iter()
-                .map(|cs| cs[i].as_ref())
-                .collect::<Vec<&dyn Array>>();
-            Ok(SortColumn {
-                values: concat(columns_i.as_slice())?,
-                options: Some(expr.options),
-            })
-        })
+        .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
+
     let indices = lexsort_to_indices(&sort_columns, fetch)?;
 
-    // Calculate composite index based on sorted indices
-    let row_indices = indices
-        .values()
+    let columns = batch
+        .columns()
         .iter()
-        .map(|i| row_indices[*i as usize])
-        .collect();
-
-    Ok(SortedIterator::new(row_indices, batch_size))
-}
+        .map(|c| take(c.as_ref(), &indices, None))
+        .collect::<Result<_, _>>()?;
 
-struct SortedIterator {
-    /// Current logical position in the iterator
-    pos: usize,
-    /// Sorted composite index of where to find the rows in buffered batches
-    composite: Vec<CompositeIndex>,
-    /// Maximum batch size to produce
-    batch_size: usize,
+    Ok(RecordBatch::try_new(batch.schema(), columns)?)
 }
 
-impl SortedIterator {
-    fn new(composite: Vec<CompositeIndex>, batch_size: usize) -> Self {
-        Self {
-            pos: 0,
-            composite,
-            batch_size,
-        }
-    }
-
-    fn memory_size(&self) -> usize {
-        std::mem::size_of_val(self) + std::mem::size_of_val(&self.composite[..])
-    }
-}
-
-impl Iterator for SortedIterator {
-    type Item = Vec<CompositeSlice>;
-
-    /// Emit a max of `batch_size` positions each time
-    fn next(&mut self) -> Option<Self::Item> {
-        let length = self.composite.len();
-        if self.pos >= length {
-            return None;
-        }
-
-        let current_size = min(self.batch_size, length - self.pos);
-
-        // Combine adjacent indexes from the same batch to make a slice,
-        // for more efficient `extend` later.
-        let mut last_batch_idx = self.composite[self.pos].batch_idx;
-        let mut indices_in_batch = Vec::with_capacity(current_size);
-
-        let mut slices = vec![];
-        for ci in &self.composite[self.pos..self.pos + current_size] {
-            if ci.batch_idx != last_batch_idx {
-                group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-                last_batch_idx = ci.batch_idx;
-            }
-            indices_in_batch.push(ci.row_idx);
-        }
-
-        assert!(
-            !indices_in_batch.is_empty(),
-            "There should have at least one record in a sort output slice."
-        );
-        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
-
-        self.pos += current_size;
-        Some(slices)
-    }
-}
-
-/// Group continuous indices into a slice for better `extend` performance
-fn group_indices(
-    batch_idx: u32,
-    positions: &mut Vec<u32>,
-    output: &mut Vec<CompositeSlice>,
-) {
-    positions.sort_unstable();
-    let mut last_pos = 0;
-    let mut run_length = 0;
-    for pos in positions.iter() {
-        if run_length == 0 {
-            last_pos = *pos;
-            run_length = 1;
-        } else if *pos == last_pos + 1 {
-            run_length += 1;
-            last_pos = *pos;
-        } else {
-            output.push(CompositeSlice {
-                batch_idx,
-                start_row_idx: last_pos + 1 - run_length,
-                len: run_length as usize,
-            });
-            last_pos = *pos;
-            run_length = 1;
-        }
-    }
-    assert!(
-        run_length > 0,
-        "There should have at least one record in a sort output slice."
-    );
-    output.push(CompositeSlice {
-        batch_idx,
-        start_row_idx: last_pos + 1 - run_length,
-        len: run_length as usize,
-    });
-    positions.clear()
-}
-
-/// Stream of sorted record batches
-struct SortedSizedRecordBatchStream {
-    schema: SchemaRef,
+async fn spill_sorted_batches(
     batches: Vec<RecordBatch>,
-    sorted_iter: SortedIterator,
-    num_cols: usize,
-    metrics: MemTrackingMetrics,
-}
-
-impl SortedSizedRecordBatchStream {
-    /// new
-    pub fn new(
-        schema: SchemaRef,
-        batches: Vec<RecordBatch>,
-        sorted_iter: SortedIterator,
-        mut metrics: MemTrackingMetrics,
-    ) -> Self {
-        let size = batches.iter().map(batch_byte_size).sum::<usize>()
-            + sorted_iter.memory_size();
-        metrics.init_mem_used(size);
-        let num_cols = batches[0].num_columns();
-        SortedSizedRecordBatchStream {
-            schema,
-            batches,
-            sorted_iter,
-            num_cols,
-            metrics,
-        }
-    }
-}
-
-impl Stream for SortedSizedRecordBatchStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        match self.sorted_iter.next() {
-            None => Poll::Ready(None),
-            Some(slices) => {
-                let num_rows = slices.iter().map(|s| s.len).sum();
-                let output = (0..self.num_cols)
-                    .map(|i| {
-                        let arrays = self
-                            .batches
-                            .iter()
-                            .map(|b| b.column(i).to_data())
-                            .collect::<Vec<_>>();
-                        let arrays = arrays.iter().collect();
-                        let mut mutable = MutableArrayData::new(arrays, false, num_rows);
-                        for x in slices.iter() {
-                            mutable.extend(
-                                x.batch_idx as usize,
-                                x.start_row_idx as usize,
-                                x.start_row_idx as usize + x.len,
-                            );
-                        }
-                        make_array(mutable.freeze())
-                    })
-                    .collect::<Vec<_>>();
-                let batch =
-                    RecordBatch::try_new(self.schema.clone(), output).map_err(Into::into);
-                let poll = Poll::Ready(Some(batch));
-                self.metrics.record_poll(poll)
-            }
-        }
-    }
-}
-
-struct CompositeSlice {
-    batch_idx: u32,
-    start_row_idx: u32,
-    len: usize,
-}
-
-impl RecordBatchStream for SortedSizedRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-async fn spill_partial_sorted_stream(
-    in_mem_stream: &mut SendableRecordBatchStream,
     path: &Path,
     schema: SchemaRef,
 ) -> Result<()> {
-    let (sender, receiver) = tokio::sync::mpsc::channel(2);
     let path: PathBuf = path.into();
-    let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema));
-    while let Some(item) = in_mem_stream.next().await {
-        sender.send(item).await.ok();
-    }
-    drop(sender);
+    let handle = task::spawn_blocking(move || write_sorted(batches, path, schema));
     match handle.await {
         Ok(r) => r,
         Err(e) => Err(DataFusionError::Execution(format!(
@@ -593,13 +382,13 @@ fn read_spill_as_stream(
 }
 
 fn write_sorted(
-    mut receiver: Receiver<Result<RecordBatch>>,
+    batches: Vec<RecordBatch>,
     path: PathBuf,
     schema: SchemaRef,
 ) -> Result<()> {
     let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
-    while let Some(batch) = receiver.blocking_recv() {
-        writer.write(&batch?)?;
+    for batch in batches {
+        writer.write(&batch)?;
     }
     writer.finish()?;
     debug!(
@@ -845,63 +634,6 @@ impl ExecutionPlan for SortExec {
     }
 }
 
-struct BatchWithSortArray {
-    sort_arrays: Vec<ArrayRef>,
-    sorted_batch: RecordBatch,
-}
-
-fn sort_batch(
-    batch: RecordBatch,
-    schema: SchemaRef,
-    expr: &[PhysicalSortExpr],
-    fetch: Option<usize>,
-) -> Result<BatchWithSortArray> {
-    let sort_columns = expr
-        .iter()
-        .map(|e| e.evaluate_to_sort_column(&batch))
-        .collect::<Result<Vec<SortColumn>>>()?;
-
-    let indices = lexsort_to_indices(&sort_columns, fetch)?;
-
-    // reorder all rows based on sorted indices
-    let sorted_batch = RecordBatch::try_new(
-        schema,
-        batch
-            .columns()
-            .iter()
-            .map(|column| {
-                take(
-                    column.as_ref(),
-                    &indices,
-                    // disable bound check overhead since indices are already generated from
-                    // the same record batch
-                    Some(TakeOptions {
-                        check_bounds: false,
-                    }),
-                )
-            })
-            .collect::<Result<Vec<ArrayRef>, ArrowError>>()?,
-    )?;
-
-    let sort_arrays = sort_columns
-        .into_iter()
-        .map(|sc| {
-            Ok(take(
-                sc.values.as_ref(),
-                &indices,
-                Some(TakeOptions {
-                    check_bounds: false,
-                }),
-            )?)
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(BatchWithSortArray {
-        sort_arrays,
-        sorted_batch,
-    })
-}
-
 async fn do_sort(
     mut input: SendableRecordBatchStream,
     partition_id: usize,
@@ -917,8 +649,6 @@ async fn do_sort(
         context.task_id()
     );
     let schema = input.schema();
-    let tracking_metrics =
-        metrics_set.new_intermediate_tracking(partition_id, context.memory_pool());
     let mut sorter = ExternalSorter::new(
         partition_id,
         schema.clone(),
@@ -930,7 +660,7 @@ async fn do_sort(
     );
     while let Some(batch) = input.next().await {
         let batch = batch?;
-        sorter.insert_batch(batch, &tracking_metrics).await?;
+        sorter.insert_batch(batch).await?;
     }
     let result = sorter.sort();
     debug!(
@@ -1089,7 +819,7 @@ mod tests {
     #[tokio::test]
     async fn test_sort_fetch_memory_calculation() -> Result<()> {
         // This test mirrors down the size from the example above.
-        let avg_batch_size = 5000;
+        let avg_batch_size = 4000;
         let partitions = 4;
 
         // A tuple of (fetch, expect_spillage)
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index fdf69ec0d1..1f72e0fcb4 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
             RecordBatch::try_from_iter(vec![(
                 "x",
                 Arc::new(Int32Array::from_iter_values(
-                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                    (0..to_read).map(|_| rng.gen()),
                 )) as ArrayRef,
             )])
             .unwrap(),
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 7e7ed031f1..f63395e455 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -205,8 +205,8 @@ mod tests {
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
             "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
             "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
             "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99    | 99    | 86          | 86          | 301  | 296  | 301   | 1004  | 305  | 305  | 301  | 301  | 1001  | 1002  | 1001   | 289    |",
+            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
             "| 289 | 261 | 296 | 301 |     | 275 | 98  | 98  | 98    | 98    | 85          | 85          | 291  | 289  | 291   | 1004  | 305  | 305  | 296  | 291  | 301   | 305   | 301    | 283    |",
             "| 286 | 259 | 291 | 296 |     | 272 | 97  | 97  | 97    | 97    | 84          | 84          | 289  | 286  | 289   | 1004  | 305  | 305  | 291  | 289  | 296   | 301   | 296    | 278    |",
             "| 275 | 254 | 289 | 291 | 289 | 269 | 96  | 96  | 96    | 96    | 83          | 83          | 286  | 283  | 286   | 305   | 305  | 305  | 289  | 286  | 291   | 296   | 291    | 275    |",