You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "Dandandan (via GitHub)" <gi...@apache.org> on 2023/10/02 12:06:51 UTC

[PR] Topk [arrow-datafusion]

Dandandan opened a new pull request, #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   ## Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ## What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ## Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ## Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#issuecomment-1748704910

   Thanks @alamb for the review - I plan to test it at our side (Coralogix) and see if there's some follow-up necessary.
   
   > The only query it gets slower for is large N with multiple files. I believe this is because reconstructing the 10,000 row outputs for each of the partitions, merging them, and then reconstructing the heap is fairly expensive. It would be better in this case to avoid the sort and doing a final topK.
   
   This sounds like a good idea, although probably in for distributed usage (e.g. Coralogix) might not be beneficial as we'll need to fetch all partitions instead of doing TopK + merge in a distributed manner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1345462327


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   I think the heuristic is fine:
   - it assures we do compaction at most every `n` (> 20) batches of input or more if batches are utlized
   - compaction reduces number of rows to `k`. 20 * 8192 = 163840 rows . If we have some wider columns of 1kB each, the memory usage could be ~200mB with some overhead. Thinking about it, I wonder if we need to trigger the compaction as well if it exceeds the configured memory limit 🤔 
   - for very large `k` (a number of times the batch size) we avoid doing compaction too often
   
   We can tweak the heuristic later if there is some cases benefiting from that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1343962711


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();

Review Comment:
   Replaced with `into_sorted_vec` which utilizes the already sorted heap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1344546762


##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Aha I was wondering what `aal` was standing for 🤣



##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Aha I was wondering what `aal` was standing for 🤣



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1345462327


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   I think the heuristic is fine:
   - it assures we do compaction at most every `n` (> 20) batches of input or more if batches are utlized
   - compaction reduces number of rows to `k`. 20 * 8192 = 163840 rows . If we have some wider columns of 1kB each, the memory usage could be ~200MB with some overhead. Thinking about it, I wonder if we need to trigger the compaction as well if it exceeds the configured memory limit 🤔 
   - for very large `k` (a number of times the batch size) we avoid doing compaction too often
   
   We can tweak the heuristic later if there is some cases benefiting from that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1345462327


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   I think the heuristic is fine:
   - it assures we do compaction at most every `n` (> 20) batches of input or more if batches are utlized
   - compaction reduces number of rows to `k`. 20 * 8192 = 163840 rows . If we have some wider columns of 1KB each, the memory usage could be ~200MB with some overhead. Thinking about it, I wonder if we need to trigger the compaction as well if it exceeds the configured memory limit 🤔 
   - for very large `k` (a number of times the batch size) we avoid doing compaction too often
   
   We can tweak the heuristic later if there is some cases benefiting from that.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   I think the heuristic is fine:
   - it assures we do compaction at most every `n` (> 20) batches of input or more if batches are utlized
   - compaction reduces number of rows to `k`. 20 * 8192 = 163840 rows . If we have some wider columns of 1KB each, the memory usage could be ~200MB with some overhead. Thinking about it, I wonder if we need to trigger the compaction as well if it exceeds the configured memory limit 🤔 
   - for very large `k` (a number of times the batch size) we avoid doing compaction too often
   
   We can tweak the heuristic later if there is some cases benefiting from that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#issuecomment-1742975463

   Thank you @Dandandan  -- let me know if you would like help finishing up this PR. It has been on my list but I haven't had a chance yet.
   
   Maybe I could make a PR that changed the display of plans to show when topk was being used 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1347935978


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -765,7 +766,12 @@ impl DisplayAs for SortExec {
                 let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
                 match self.fetch {
                     Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
+                        write!(
+                            f,
+                            // TODO should this say topk?

Review Comment:
   https://github.com/apache/arrow-datafusion/issues/7750 trcks



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -765,7 +766,12 @@ impl DisplayAs for SortExec {
                 let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
                 match self.fetch {
                     Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
+                        write!(
+                            f,
+                            // TODO should this say topk?

Review Comment:
   https://github.com/apache/arrow-datafusion/issues/7750 tracks this work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1346118763


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   Sounds good -- thank you. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1346119499


##########
datafusion/sqllogictest/test_files/topk.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests for development
+
+statement ok
+create table topk(x int) as values (10), (2), (3), (0), (5), (4), (3), (2), (1), (3), (8);
+
+query I
+select * from topk order by x;
+----
+0
+1
+2
+2
+3
+3
+3
+4
+5
+8
+10
+
+query I
+select * from topk order by x limit 3;
+----
+0
+1
+2
+
+query I
+select * from topk order by x desc limit 3;
+----
+10
+8
+5
+
+
+
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+  c1  VARCHAR NOT NULL,
+  c2  TINYINT NOT NULL,
+  c3  SMALLINT NOT NULL,
+  c4  SMALLINT,
+  c5  INT,
+  c6  BIGINT NOT NULL,
+  c7  SMALLINT NOT NULL,
+  c8  INT NOT NULL,
+  c9  BIGINT UNSIGNED NOT NULL,
+  c10 VARCHAR NOT NULL,
+  c11 FLOAT NOT NULL,
+  c12 DOUBLE NOT NULL,
+  c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+query TT
+explain select * from aggregate_test_100 ORDER BY c13 desc limit 5;
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: aggregate_test_100.c13 DESC NULLS FIRST, fetch=5
+----TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--SortExec: fetch=5, expr=[c13@12 DESC]
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
+
+
+
+
+query T
+select c13 from aggregate_test_100 ORDER BY c13;
+----
+0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm
+0keZ5G8BffGwgF2RwQD59TFzMStxCB
+0og6hSkhbX8AC1ktFS4kounvTzy8Vo
+1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO
+2T3wSlHdEmASmO0xcXHnndkKEt6bz8
+3BEOHQsMEFZ58VcNTOJYShTBpAPzbt
+4HX6feIvmNXBN7XGqgO4YVBkhu8GDI
+4JznSdBajNWhu4hRQwjV1FjTTxY68i
+52mKlRE3aHCBZtjECq6sY9OqVf8Dze
+56MZa5O1hVtX4c5sbnCfxuX5kDChqI
+6FPJlLAcaQ5uokyOWZ9HGdLZObFvOZ
+6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW
+6oIXZuIPIqEoPBvFmbt2Nxy3tryGUE
+6x93sxYioWuq5c9Kkk8oTAAORM7cH0
+802bgTGl6Bk5TlkPYYTxp5JkKyaYUA
+8LIh0b6jmDGm87BmIyjdxNIpX4ugjD
+90gAtmGEeIqUTbo1ZrxCvWtsseukXC
+9UbObCsVkmYpJGcGrgfK90qOnwb2Lj
+AFGCj7OWlEB5QfniEFgonMq90Tq5uH
+ALuRhobVWbnQTTWZdSOk0iVe8oYFhW
+Amn2K87Db5Es3dFQO9cw9cvpAM6h35
+AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz
+BJqx5WokrmrrezZA0dUbleMYkG5U2O
+BPtQMxnuSPpxMExYV9YkDa6cAN7GP3
+BsM5ZAYifRh5Lw3Y8X1r53I0cTJnfE
+C2GT5KVyOPZpgKVl110TyZO0NcJ434
+DuJNG8tufSqW0ZstHqWj3aGvFLMg4A
+EcCuckwsF3gV1Ecgmh5v4KM8g1ozif
+ErJFw6hzZ5fmI5r8bhE4JzlscnhKZU
+F7NSTjWvQJyBburN7CXRUlbgp2dIrA
+Fi4rJeTQq4eXj8Lxg3Hja5hBVTVV5u
+H5j5ZHy1FGesOAHjkQEDYCucbpKWRu
+HKSMQ9nTnwXCJIte1JrM1dtYnDtJ8g
+IWl0G3ZlMNf7WT8yjIB49cx7MmYOmr
+IZTkHMLvIKuiLjhDjYMmIHxh166we4
+Ig1QcuKsjHXkproePdERo2w0mYzIqd
+JHNgc2UCaiXOdmkxwDDyGhRlO0mnBQ
+JN0VclewmjwYlSl8386MlWv5rEhWCz
+JafwVLSVk5AVoXFuzclesQ000EE2k1
+KJFcmTVjdkCMv94wYCtfHMFhzyRsmH
+Ktb7GQ0N1DrxwkCkEUsTaIXk0xYinn
+Ld2ej8NEv5zNcqU60FwpHeZKBhfpiV
+LiEBxds3X0Uw0lxiYjDqrkAaAwoiIW
+MXhhH1Var3OzzJCtI9VNyYvA0q8UyJ
+MeSTAXq8gVxVjbEjgkvU9YLte0X9uE
+NEhyk8uIx4kEULJGa8qIyFjjBcP2G6
+O66j6PaYuZhEUtqV6fuU7TyjM2WxC5
+OF7fQ37GzaZ5ikA2oMyvleKtgnLjXh
+OPwBqCEK5PWTjWaiOyL45u2NLTaDWv
+Oq6J4Rx6nde0YlhOIJkFsX2MsSvAQ0
+Ow5PGpfTm4dXCfTDsXAOTatXRoAydR
+QEHVvcP8gxI6EMJIrvcnIhgzPNjIvv
+QJYm7YRA3YetcBHI5wkMZeLXVmfuNy
+QYlaIAnJA6r8rlAb6f59wcxvcPcWFf
+RilTlL1tKkPOUFuzmLydHAVZwv1OGl
+Sfx0vxv1skzZWT1PqVdoRDdO6Sb6xH
+TTQUwpMNSXZqVBKAFvXu7OlWvKXJKX
+TtDKUZxzVxsq758G6AWPSYuZgVgbcl
+VDhtJkYjAYPykCgOU9x3v7v3t4SO1a
+VY0zXmXeksCT8BzvpzpPLbmU9Kp9Y4
+Vp3gmWunM5A7wOC9YW2JroFqTWjvTi
+WHmjWk2AY4c6m7DA4GitUx6nmb1yYS
+XemNcT1xp61xcM1Qz3wZ1VECCnq06O
+Z2sWcQr0qyCJRMHDpRy3aQr7PkHtkK
+aDxBtor7Icd9C5hnTvvw5NrIre740e
+akiiY5N0I44CMwEnBL6RTBk7BRkxEj
+b3b9esRhTzFEawbs6XhpKnD9ojutHB
+bgK1r6v3BCTh0aejJUhkA1Hn6idXGp
+cBGc0kSm32ylBDnxogG727C0uhZEYZ
+cq4WSAIFwx3wwTUS5bp1wCe71R6U5I
+dVdvo6nUD5FgCgsbOZLds28RyGTpnx
+e2Gh6Ov8XkXoFdJWhl0EjwEHlMDYyG
+f9ALCzwDAKmdu7Rk2msJaB1wxe5IBX
+fuyvs0w7WsKSlXqJ1e6HFSoLmx03AG
+gTpyQnEODMcpsPnJMZC66gh33i3m0b
+gpo8K5qtYePve6jyPt6xgJx4YOVjms
+gxfHWUF8XgY2KdFxigxvNEXe2V2XMl
+i6RQVXKUh7MzuGMDaNclUYnFUAireU
+ioEncce3mPOXD2hWhpZpCPWGATG6GU
+jQimhdepw3GKmioWUlVSWeBVRKFkY3
+l7uwDoTepWwnAP0ufqtHJS3CRi7RfP
+lqhzgLsXZ8JhtpeeUWWNbMz8PHI705
+m6jD0LBIQWaMfenwRCTANI9eOdyyto
+mhjME0zBHbrK6NMkytMTQzOssOa1gF
+mzbkwXKrPeZnxg2Kn1LRF5hYSsmksS
+nYVJnVicpGRqKZibHyBAmtmzBXAFfT
+oHJMNvWuunsIMIWFnYG31RCfkOo2V7
+oLZ21P2JEDooxV1pU31cIxQHEeeoLu
+okOkcWflkNXIy4R8LzmySyY1EC3sYd
+pLk3i59bZwd5KBZrI1FiweYTd5hteG
+pTeu0WMjBRTaNRT15rLCuEh3tBJVc5
+qnPOOmslCJaT45buUisMRnM0rc77EK
+t6fQUjJejPcjc04wHvHTPe55S65B4V
+ukOiFGGFnQJDHFgZxHMpvhD3zybF0M
+ukyD7b0Efj7tNlFSRmzZ0IqkEzg2a8
+waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs
+wwXqSGKLyBQyPkonlzBNYUJTCo4LRS
+xipQ93429ksjNcXPX5326VSg1xJZcW
+y7C453hRWd4E7ImjNDWlpexB8nUqjh
+ydkwycaISlYSlEq3TlkS2m15I2pcp8
+
+
+query TIIIIIIIITRRT
+select * from aggregate_test_100 ORDER BY c13 desc limit 5;
+----
+a 4 -38 20744 762932956 308913475857409919 7 45465 1787652631 878137512938218976 0.7459874 0.021825780392 ydkwycaISlYSlEq3TlkS2m15I2pcp8
+d 1 -98 13630 -1991133944 1184110014998006843 220 2986 225513085 9634106610243643486 0.89651865 0.164088254508 y7C453hRWd4E7ImjNDWlpexB8nUqjh
+e 2 52 -12056 -1090239422 9011500141803970147 238 4168 2013662838 12565360638488684051 0.6694766 0.391444365692 xipQ93429ksjNcXPX5326VSg1xJZcW
+d 1 -72 25590 1188089983 3090286296481837049 241 832 3542840110 5885937420286765261 0.41980565 0.215354023438 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS
+a 1 -5 12636 794623392 2909750622865366631 15 24022 2669374863 4776679784701509574 0.29877836 0.253725340799 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs
+
+
+
+## -- make tiny batches to trigger batch compaction
+statement ok
+set datafusion.execution.batch_size = 2
+
+query TIIIIIIIITRRT
+select * from aggregate_test_100 ORDER BY c13 desc limit 5;
+----
+a 4 -38 20744 762932956 308913475857409919 7 45465 1787652631 878137512938218976 0.7459874 0.021825780392 ydkwycaISlYSlEq3TlkS2m15I2pcp8
+d 1 -98 13630 -1991133944 1184110014998006843 220 2986 225513085 9634106610243643486 0.89651865 0.164088254508 y7C453hRWd4E7ImjNDWlpexB8nUqjh
+e 2 52 -12056 -1090239422 9011500141803970147 238 4168 2013662838 12565360638488684051 0.6694766 0.391444365692 xipQ93429ksjNcXPX5326VSg1xJZcW
+d 1 -72 25590 1188089983 3090286296481837049 241 832 3542840110 5885937420286765261 0.41980565 0.215354023438 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS
+a 1 -5 12636 794623392 2909750622865366631 15 24022 2669374863 4776679784701509574 0.29877836 0.253725340799 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs
+
+
+## make an example for

Review Comment:
   ```suggestion
   ## make an example for dictionary encoding
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1344112214


##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   this file probably at least needs to be renamed to topk.slt or something other than my initials (`AAL` 😆 )
   
   I am also not sure it is the best test of the topk coverage -- I had this for quickly iterating during development



##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -2673,14 +2673,14 @@ SELECT
   LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
   LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
   FROM annotated_data_finite
-  ORDER BY ts DESC
+  ORDER BY ts DESC, fv2
   LIMIT 5;
 ----
-289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
-289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
-289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
-286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
-275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286

Review Comment:
   added ts to show that the first two values are tied, and that the output is correct ✅ 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.

Review Comment:
   I think this comment is out of date (and mostly due to my misunderstanding of how the BinaryHeap worked 😆 )
   
   ```suggestion
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:

Review Comment:
   I like the suggestion to try and filter earlier. However, I think it should be driven by profiling and benchmarks. Maybe put that idea in comments as a "Potential future improvement" ?



##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   this file probably at least needs to be renamed to topk.slt or something other than my initials (`AAL` 😆 )
   
   I am also not sure it is the best test of the topk coverage -- I had this for quickly iterating during development



##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -2673,14 +2673,14 @@ SELECT
   LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
   LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
   FROM annotated_data_finite
-  ORDER BY ts DESC
+  ORDER BY ts DESC, fv2
   LIMIT 5;
 ----
-289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
-289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
-289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
-286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
-275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286

Review Comment:
   added ts to show that the first two values are tied, and that the output is correct ✅ 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.

Review Comment:
   I think this comment is out of date (and mostly due to my misunderstanding of how the BinaryHeap worked 😆 )
   
   ```suggestion
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)

Review Comment:
   This is like `interleave` but for `RecordBatches`. @wjones127  was asking about something similar for `take` for `RecordBatches` as well. Maybe it would be a helpful API to add 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;

Review Comment:
   ```suggestion
           // TODO there is potential to add special cases for single column sort fields
           // to improve performance
           let row_converter = RowConverter::new(sort_fields)?;
   ```



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -765,7 +766,12 @@ impl DisplayAs for SortExec {
                 let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
                 match self.fetch {
                     Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
+                        write!(
+                            f,
+                            // TODO should this say topk?

Review Comment:
   I think in general it would be good to be able to tell what operator was going to be used from looking at the plan. However, I think we can do so as a follow on PR -- I can file a ticket. 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)

Review Comment:
   This is like `interleave` but for `RecordBatches`. @wjones127  was asking about something similar for `take` for `RecordBatches` as well. Maybe it would be a helpful API to add 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among

Review Comment:
   ```suggestion
       /// Insert `batch`, remembering if any of its values are among
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.

Review Comment:
   ```suggestion
   /// K=3 elements, reducing the total amount of required buffer memory.
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   @Dandandan  did you review this heuristic -- I remember I tried it on our high cardinality tracing usecase and it seemed to work well (basically it is needed for large N with random-ish inputs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1343962035


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:

Review Comment:
   Is this TODO still applicable @alamb? I don't think we can do either one of them as we need to visit all input rows and compare them with the max?
   
   One possibility I'm seeing is to filter out rows early (per batch) that are bigger than the current max (which might be better vectorizable than the per-row logic).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1343969678


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -765,7 +766,12 @@ impl DisplayAs for SortExec {
                 let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
                 match self.fetch {
                     Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
+                        write!(
+                            f,
+                            // TODO should this say topk?

Review Comment:
   I'm not sure if we would like to do this? I think there are some other `ExecutionPlan` nodes that have the algorithm depend on one of the parameters (for example: HashAggregate `mode`s) .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1342670439


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::row::{RowConverter, Rows, SortField};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{
+    builder::StringBuilder, cast::AsArray, downcast_dictionary_array, Array, ArrayRef,
+    DictionaryArray, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
+    StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow_schema::{DataType, SchemaRef};
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        let mut topk_rows = std::mem::take(&mut self.inner).into_vec();
+
+        // sort low to high (reverse the reverse)
+        topk_rows.sort();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is
+        // past some pre-defined threshold. Target holding up to
+        // around 20 batches, but handle cases of large k where some
+        // batches might be partially full
+        let max_unused_rows = (20 * self.batch_size) + self.k;
+        let unused_rows = self.store.unused_rows();
+
+        // don't compact if the store has only one batch or
+        if self.store.len() <= 2 || unused_rows < max_unused_rows {
+            //if self.store.len() <= 2 {
+            return Ok(());
+        }
+        // at first, compact the entire thing always into a new batch
+        // (maybe we can get fancier in the future about ignoring
+        // batches that have a high usage ratio already
+
+        // Note: new batch is in the same order as inner
+        let num_rows = self.inner.len();
+        let (new_batch, mut topk_rows) = self.emit_with_state()?;
+
+        // clear all old entires in store (this invalidates all
+        // store_ids in `inner`)
+        self.store.clear();
+
+        let mut batch_entry = self.register_batch(new_batch);
+        batch_entry.uses = num_rows;
+
+        // rewrite all existing entries to use the new batch, and
+        // remove old entries. The sortedness and their relative
+        // position do not change
+        for (i, topk_row) in topk_rows.iter_mut().enumerate() {
+            topk_row.batch_id = batch_entry.id;
+            topk_row.index = i;
+        }
+        self.insert_batch_entry(batch_entry);
+        // restore the heap
+        self.inner = BinaryHeap::from(topk_rows);
+
+        Ok(())
+    }
+
+    /// return the size of memory used by this heap, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + (self.inner.capacity() * std::mem::size_of::<TopKRow>())
+            + self.store.size()
+            + self.owned_bytes
+    }
+}
+
+/// Represents one of the top K rows held in this heap. Orders
+/// according to memcmp of row (e.g. the arrow Row format, but could
+/// also be primtive values)
+///
+/// Reuses allocations to minimize runtime overhead of creating new Vecs
+#[derive(Debug, PartialEq)]
+struct TopKRow {
+    /// the value of the sort key for this row. This contains the
+    /// bytes that could be stored in `OwnedRow` but uses `Vec<u8>` to
+    /// reuse allocations.
+    row: Vec<u8>,
+    /// the RecordBatch this row came from: an id into a [`RecordBatchStore`]
+    batch_id: u32,
+    /// the index in this record batch the row came from
+    index: usize,
+}
+
+impl TopKRow {
+    /// Create a new TopKRow with new allocation
+    fn new(row: impl AsRef<[u8]>, batch_id: u32, index: usize) -> Self {
+        Self {
+            row: row.as_ref().to_vec(),
+            batch_id,
+            index,
+        }
+    }
+
+    /// Create a new  TopKRow reusing the existing allocation
+    fn with_new_row(
+        self,
+        new_row: impl AsRef<[u8]>,
+        batch_id: u32,
+        index: usize,
+    ) -> Self {
+        let Self {
+            mut row,
+            batch_id: _,
+            index: _,
+        } = self;
+        row.clear();
+        row.extend_from_slice(new_row.as_ref());
+
+        Self {
+            row,
+            batch_id,
+            index,
+        }
+    }
+
+    /// Returns the number of bytes owned by this row in the heap (not
+    /// including itself)
+    fn owned_size(&self) -> usize {
+        self.row.capacity()
+    }
+
+    /// Returns a slice to the owned row value
+    fn row(&self) -> &[u8] {
+        self.row.as_slice()
+    }
+}
+
+impl Eq for TopKRow {}
+
+impl PartialOrd for TopKRow {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for TopKRow {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.row.cmp(&other.row)
+    }
+}
+
+#[derive(Debug)]
+struct RecordBatchEntry {
+    id: u32,
+    batch: RecordBatch,
+    // for this batch, how many times has it been used
+    uses: usize,
+}
+
+/// This structure tracks [`RecordBatch`] by an id so that:
+///
+/// 1. The baches can be tracked via an id that can be copied cheaply
+/// 2. The total memory held by all batches is tracked
+#[derive(Debug)]
+struct RecordBatchStore {
+    /// id generator
+    next_id: u32,
+    /// storage
+    batches: HashMap<u32, RecordBatchEntry>,
+    /// total size of all record batches tracked by this store
+    batches_size: usize,
+    /// schema of the batches
+    schema: SchemaRef,
+}
+
+impl RecordBatchStore {
+    fn new(schema: SchemaRef) -> Self {
+        Self {
+            next_id: 0,
+            batches: HashMap::new(),
+            batches_size: 0,
+            schema,
+        }
+    }
+
+    /// Register this batch with the store and assign an ID. No
+    /// attempt is made to compare this batch to other batches
+    pub fn register(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        let id = self.next_id;
+        self.next_id += 1;
+        RecordBatchEntry { id, batch, uses: 0 }
+    }
+
+    /// Insert a record batch entry into this store, tracking its
+    /// memory use, if it has any uses
+    pub fn insert(&mut self, entry: RecordBatchEntry) {
+        // uses of 0 means that none of the rows in the batch were stored in the topk
+        if entry.uses > 0 {
+            self.batches_size += entry.batch.get_array_memory_size();
+            self.batches.insert(entry.id, entry);
+        }
+    }
+
+    /// Clear all values in this store, invalidating all previous batch ids
+    fn clear(&mut self) {
+        self.batches.clear();
+        self.batches_size = 0;
+    }
+
+    fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
+        self.batches.get(&id)
+    }
+
+    /// returns the total number of batches stored in this store
+    fn len(&self) -> usize {
+        self.batches.len()
+    }
+
+    /// Returns the total number of rows in batches minus the number
+    /// which are in use
+    fn unused_rows(&self) -> usize {
+        self.batches
+            .values()
+            .map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
+            .sum()
+    }
+
+    /// returns true if the store has nothing stored
+    fn is_empty(&self) -> bool {
+        self.batches.is_empty()
+    }
+
+    /// return the schema of batches stored
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// remove a use from the specified batch id. If the use count
+    /// reaches zero the batch entry is removed from the store
+    ///
+    /// panics if there were no remaining uses of id
+    pub fn unuse(&mut self, id: u32) {
+        let remove = if let Some(batch_entry) = self.batches.get_mut(&id) {
+            batch_entry.uses = batch_entry.uses.checked_sub(1).expect("underflow");
+            batch_entry.uses == 0
+        } else {
+            panic!("No entry for id {id}");
+        };
+
+        if remove {
+            let old_entry = self.batches.remove(&id).unwrap();
+            self.batches_size = self
+                .batches_size
+                .checked_sub(old_entry.batch.get_array_memory_size())
+                .unwrap();
+        }
+    }
+
+    /// returns the size of memory used by this store, including all
+    /// referenced `RecordBatch`es, in bytes
+    pub fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.batches.capacity()
+                * (std::mem::size_of::<u32>() + std::mem::size_of::<RecordBatchEntry>())
+            + self.batches_size
+    }
+}
+
+/// wrapper over [`arrow::compute::interleave`] that re-encodes
+/// dictionaries that have a low usage (values referenced)
+fn interleave(values: &[&dyn Array], indices: &[(usize, usize)]) -> Result<ArrayRef> {
+    // for now, always re-encode only string dictionaries
+    if !values.is_empty() {
+        match values[0].data_type() {
+            DataType::Dictionary(_key_type, value_type)
+                if value_type.as_ref() == &DataType::Utf8 =>
+            {
+                return interleave_and_repack_dictionary(values, indices);
+            }
+            _ => {}
+        }
+    }
+    // fallback to arrow
+    Ok(arrow::compute::interleave(values, indices)?)
+}
+
+/// Special interleave kernel that re-creates the dictionary values,
+/// ensuring no unused space
+fn interleave_and_repack_dictionary(

Review Comment:
   BTW I think @tustvold  basically did this upstream in arrow 47.0.0 but now I am struggling to find the PR.
   
   So in other words I think we can remove the `interleave_and_repack_dictionary` code. I can test with my high cardinality dictionary usecase if that would help
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#issuecomment-1749598438

   Filed 
   * https://github.com/apache/arrow-datafusion/issues/7749 to track adding fuzz tests
   * https://github.com/apache/arrow-datafusion/issues/7750 to track making it clearer when TopK is used in the plans


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan merged PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1342764263


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,754 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::row::{RowConverter, Rows, SortField};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{
+    builder::StringBuilder, cast::AsArray, downcast_dictionary_array, Array, ArrayRef,
+    DictionaryArray, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
+    StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
+};
+use arrow_schema::{DataType, SchemaRef};
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        let mut topk_rows = std::mem::take(&mut self.inner).into_vec();
+
+        // sort low to high (reverse the reverse)
+        topk_rows.sort();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is
+        // past some pre-defined threshold. Target holding up to
+        // around 20 batches, but handle cases of large k where some
+        // batches might be partially full
+        let max_unused_rows = (20 * self.batch_size) + self.k;
+        let unused_rows = self.store.unused_rows();
+
+        // don't compact if the store has only one batch or
+        if self.store.len() <= 2 || unused_rows < max_unused_rows {
+            //if self.store.len() <= 2 {
+            return Ok(());
+        }
+        // at first, compact the entire thing always into a new batch
+        // (maybe we can get fancier in the future about ignoring
+        // batches that have a high usage ratio already
+
+        // Note: new batch is in the same order as inner
+        let num_rows = self.inner.len();
+        let (new_batch, mut topk_rows) = self.emit_with_state()?;
+
+        // clear all old entires in store (this invalidates all
+        // store_ids in `inner`)
+        self.store.clear();
+
+        let mut batch_entry = self.register_batch(new_batch);
+        batch_entry.uses = num_rows;
+
+        // rewrite all existing entries to use the new batch, and
+        // remove old entries. The sortedness and their relative
+        // position do not change
+        for (i, topk_row) in topk_rows.iter_mut().enumerate() {
+            topk_row.batch_id = batch_entry.id;
+            topk_row.index = i;
+        }
+        self.insert_batch_entry(batch_entry);
+        // restore the heap
+        self.inner = BinaryHeap::from(topk_rows);
+
+        Ok(())
+    }
+
+    /// return the size of memory used by this heap, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + (self.inner.capacity() * std::mem::size_of::<TopKRow>())
+            + self.store.size()
+            + self.owned_bytes
+    }
+}
+
+/// Represents one of the top K rows held in this heap. Orders
+/// according to memcmp of row (e.g. the arrow Row format, but could
+/// also be primtive values)
+///
+/// Reuses allocations to minimize runtime overhead of creating new Vecs
+#[derive(Debug, PartialEq)]
+struct TopKRow {
+    /// the value of the sort key for this row. This contains the
+    /// bytes that could be stored in `OwnedRow` but uses `Vec<u8>` to
+    /// reuse allocations.
+    row: Vec<u8>,
+    /// the RecordBatch this row came from: an id into a [`RecordBatchStore`]
+    batch_id: u32,
+    /// the index in this record batch the row came from
+    index: usize,
+}
+
+impl TopKRow {
+    /// Create a new TopKRow with new allocation
+    fn new(row: impl AsRef<[u8]>, batch_id: u32, index: usize) -> Self {
+        Self {
+            row: row.as_ref().to_vec(),
+            batch_id,
+            index,
+        }
+    }
+
+    /// Create a new  TopKRow reusing the existing allocation
+    fn with_new_row(
+        self,
+        new_row: impl AsRef<[u8]>,
+        batch_id: u32,
+        index: usize,
+    ) -> Self {
+        let Self {
+            mut row,
+            batch_id: _,
+            index: _,
+        } = self;
+        row.clear();
+        row.extend_from_slice(new_row.as_ref());
+
+        Self {
+            row,
+            batch_id,
+            index,
+        }
+    }
+
+    /// Returns the number of bytes owned by this row in the heap (not
+    /// including itself)
+    fn owned_size(&self) -> usize {
+        self.row.capacity()
+    }
+
+    /// Returns a slice to the owned row value
+    fn row(&self) -> &[u8] {
+        self.row.as_slice()
+    }
+}
+
+impl Eq for TopKRow {}
+
+impl PartialOrd for TopKRow {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for TopKRow {
+    fn cmp(&self, other: &Self) -> Ordering {
+        self.row.cmp(&other.row)
+    }
+}
+
+#[derive(Debug)]
+struct RecordBatchEntry {
+    id: u32,
+    batch: RecordBatch,
+    // for this batch, how many times has it been used
+    uses: usize,
+}
+
+/// This structure tracks [`RecordBatch`] by an id so that:
+///
+/// 1. The baches can be tracked via an id that can be copied cheaply
+/// 2. The total memory held by all batches is tracked
+#[derive(Debug)]
+struct RecordBatchStore {
+    /// id generator
+    next_id: u32,
+    /// storage
+    batches: HashMap<u32, RecordBatchEntry>,
+    /// total size of all record batches tracked by this store
+    batches_size: usize,
+    /// schema of the batches
+    schema: SchemaRef,
+}
+
+impl RecordBatchStore {
+    fn new(schema: SchemaRef) -> Self {
+        Self {
+            next_id: 0,
+            batches: HashMap::new(),
+            batches_size: 0,
+            schema,
+        }
+    }
+
+    /// Register this batch with the store and assign an ID. No
+    /// attempt is made to compare this batch to other batches
+    pub fn register(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        let id = self.next_id;
+        self.next_id += 1;
+        RecordBatchEntry { id, batch, uses: 0 }
+    }
+
+    /// Insert a record batch entry into this store, tracking its
+    /// memory use, if it has any uses
+    pub fn insert(&mut self, entry: RecordBatchEntry) {
+        // uses of 0 means that none of the rows in the batch were stored in the topk
+        if entry.uses > 0 {
+            self.batches_size += entry.batch.get_array_memory_size();
+            self.batches.insert(entry.id, entry);
+        }
+    }
+
+    /// Clear all values in this store, invalidating all previous batch ids
+    fn clear(&mut self) {
+        self.batches.clear();
+        self.batches_size = 0;
+    }
+
+    fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
+        self.batches.get(&id)
+    }
+
+    /// returns the total number of batches stored in this store
+    fn len(&self) -> usize {
+        self.batches.len()
+    }
+
+    /// Returns the total number of rows in batches minus the number
+    /// which are in use
+    fn unused_rows(&self) -> usize {
+        self.batches
+            .values()
+            .map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
+            .sum()
+    }
+
+    /// returns true if the store has nothing stored
+    fn is_empty(&self) -> bool {
+        self.batches.is_empty()
+    }
+
+    /// return the schema of batches stored
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// remove a use from the specified batch id. If the use count
+    /// reaches zero the batch entry is removed from the store
+    ///
+    /// panics if there were no remaining uses of id
+    pub fn unuse(&mut self, id: u32) {
+        let remove = if let Some(batch_entry) = self.batches.get_mut(&id) {
+            batch_entry.uses = batch_entry.uses.checked_sub(1).expect("underflow");
+            batch_entry.uses == 0
+        } else {
+            panic!("No entry for id {id}");
+        };
+
+        if remove {
+            let old_entry = self.batches.remove(&id).unwrap();
+            self.batches_size = self
+                .batches_size
+                .checked_sub(old_entry.batch.get_array_memory_size())
+                .unwrap();
+        }
+    }
+
+    /// returns the size of memory used by this store, including all
+    /// referenced `RecordBatch`es, in bytes
+    pub fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.batches.capacity()
+                * (std::mem::size_of::<u32>() + std::mem::size_of::<RecordBatchEntry>())
+            + self.batches_size
+    }
+}
+
+/// wrapper over [`arrow::compute::interleave`] that re-encodes
+/// dictionaries that have a low usage (values referenced)
+fn interleave(values: &[&dyn Array], indices: &[(usize, usize)]) -> Result<ArrayRef> {
+    // for now, always re-encode only string dictionaries
+    if !values.is_empty() {
+        match values[0].data_type() {
+            DataType::Dictionary(_key_type, value_type)
+                if value_type.as_ref() == &DataType::Utf8 =>
+            {
+                return interleave_and_repack_dictionary(values, indices);
+            }
+            _ => {}
+        }
+    }
+    // fallback to arrow
+    Ok(arrow::compute::interleave(values, indices)?)
+}
+
+/// Special interleave kernel that re-creates the dictionary values,
+/// ensuring no unused space
+fn interleave_and_repack_dictionary(

Review Comment:
   Thanks - I made the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Topk [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1343962711


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();

Review Comment:
   Replaced `sort` with `into_sorted_vec` which utilizes the already sorted heap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1344548482


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   I'll take a look :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#issuecomment-1747670007

   FYI @gruuya -- it is finally happening


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org