You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/10/03 13:50:38 UTC

Re: [PR] Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator [arrow-datafusion]

alamb commented on code in PR #7721:
URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1344112214


##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   this file probably at least needs to be renamed to topk.slt or something other than my initials (`AAL` 😆 )
   
   I am also not sure it is the best test of the topk coverage -- I had this for quickly iterating during development



##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -2673,14 +2673,14 @@ SELECT
   LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
   LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
   FROM annotated_data_finite
-  ORDER BY ts DESC
+  ORDER BY ts DESC, fv2
   LIMIT 5;
 ----
-289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
-289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
-289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
-286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
-275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286

Review Comment:
   added ts to show that the first two values are tied, and that the output is correct ✅ 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.

Review Comment:
   I think this comment is out of date (and mostly due to my misunderstanding of how the BinaryHeap worked 😆 )
   
   ```suggestion
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:

Review Comment:
   I like the suggestion to try and filter earlier. However, I think it should be driven by profiling and benchmarks. Maybe put that idea in comments as a "Potential future improvement" ?



##########
datafusion/sqllogictest/test_files/aal.slt:
##########
@@ -0,0 +1,232 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   this file probably at least needs to be renamed to topk.slt or something other than my initials (`AAL` 😆 )
   
   I am also not sure it is the best test of the topk coverage -- I had this for quickly iterating during development



##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -2673,14 +2673,14 @@ SELECT
   LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
   LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
   FROM annotated_data_finite
-  ORDER BY ts DESC
+  ORDER BY ts DESC, fv2
   LIMIT 5;
 ----
-289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
-289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
-289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
-286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
-275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286

Review Comment:
   added ts to show that the first two values are tied, and that the output is correct ✅ 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.

Review Comment:
   I think this comment is out of date (and mostly due to my misunderstanding of how the BinaryHeap worked 😆 )
   
   ```suggestion
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)

Review Comment:
   This is like `interleave` but for `RecordBatches`. @wjones127  was asking about something similar for `take` for `RecordBatches` as well. Maybe it would be a helpful API to add 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;

Review Comment:
   ```suggestion
           // TODO there is potential to add special cases for single column sort fields
           // to improve performance
           let row_converter = RowConverter::new(sort_fields)?;
   ```



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -765,7 +766,12 @@ impl DisplayAs for SortExec {
                 let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
                 match self.fetch {
                     Some(fetch) => {
-                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
+                        write!(
+                            f,
+                            // TODO should this say topk?

Review Comment:
   I think in general it would be good to be able to tell what operator was going to be used from looking at the plan. However, I think we can do so as a follow on PR -- I can file a ticket. 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)

Review Comment:
   This is like `interleave` but for `RecordBatches`. @wjones127  was asking about something similar for `take` for `RecordBatches` as well. Maybe it would be a helpful API to add 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among

Review Comment:
   ```suggestion
       /// Insert `batch`, remembering if any of its values are among
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.

Review Comment:
   ```suggestion
   /// K=3 elements, reducing the total amount of required buffer memory.
   ```



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TopK: Combination of Sort / LIMIT
+
+use arrow::{
+    compute::interleave,
+    row::{RowConverter, Rows, SortField},
+};
+use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
+
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::{
+    memory_pool::{MemoryConsumer, MemoryReservation},
+    runtime_env::RuntimeEnv,
+};
+use datafusion_physical_expr::PhysicalSortExpr;
+use hashbrown::HashMap;
+
+use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
+
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
+
+/// Global TopK
+///
+/// # Background
+///
+/// "Top K" is a common query optimization used for queries such as
+/// "find the top 3 customers by revenue". The (simplified) SQL for
+/// such a query might be:
+///
+/// ```sql
+/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3;
+/// ```
+///
+/// The simple plan would be:
+///
+/// ```sql
+/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+/// +--------------+----------------------------------------+
+/// | plan_type    | plan                                   |
+/// +--------------+----------------------------------------+
+/// | logical_plan | Limit: 3                               |
+/// |              |   Sort: revenue DESC NULLS FIRST       |
+/// |              |     Projection: customer_id, revenue   |
+/// |              |       TableScan: sales                 |
+/// +--------------+----------------------------------------+
+/// ```
+///
+/// While this plan produces the correct answer, it will fully sorts the
+/// input before discarding everything other than the top 3 elements.
+///
+/// The same answer can be produced by simply keeping track of the top
+/// N elements, reducing the total amount of required buffer memory.
+///
+/// # Structure
+///
+/// This operator tracks the top K items using a `TopKHeap`.
+pub struct TopK {
+    /// schema of the output (and the input)
+    schema: SchemaRef,
+    /// Runtime metrics
+    metrics: TopKMetrics,
+    /// Reservation
+    reservation: MemoryReservation,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// sort expressions
+    expr: Arc<[PhysicalSortExpr]>,
+    /// row converter, for sort keys
+    row_converter: RowConverter,
+    /// scratch space for converting rows
+    scratch_rows: Rows,
+    /// stores the top k values and their sort key values, in order
+    heap: TopKHeap,
+}
+
+impl TopK {
+    /// Create a new [`TopK`] that stores the top `k` values, as
+    /// defined by the sort expressions in `expr`.
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        k: usize,
+        batch_size: usize,
+        runtime: Arc<RuntimeEnv>,
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
+            .register(&runtime.memory_pool);
+
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
+
+        let sort_fields: Vec<_> = expr
+            .iter()
+            .map(|e| {
+                Ok(SortField::new_with_options(
+                    e.expr.data_type(&schema)?,
+                    e.options,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        let row_converter = RowConverter::new(sort_fields)?;
+        let scratch_rows = row_converter.empty_rows(
+            batch_size,
+            20 * batch_size, // guestimate 20 bytes per row
+        );
+
+        Ok(Self {
+            schema: schema.clone(),
+            metrics: TopKMetrics::new(metrics, partition),
+            reservation,
+            batch_size,
+            expr,
+            row_converter,
+            scratch_rows,
+            heap: TopKHeap::new(k, batch_size, schema),
+        })
+    }
+
+    /// Insert `batch`, remembering it if any of its values are among
+    /// the top k seen so far.
+    pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Updates on drop
+        let _timer = self.metrics.baseline.elapsed_compute().timer();
+
+        let sort_keys: Vec<ArrayRef> = self
+            .expr
+            .iter()
+            .map(|expr| {
+                let value = expr.expr.evaluate(&batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // reuse existing `Rows` to avoid reallocations
+        let rows = &mut self.scratch_rows;
+        rows.clear();
+        self.row_converter.append(rows, &sort_keys)?;
+
+        // TODO make this algorithmically better?:
+        // 1. only check topk values in rows
+        // 2. only do one update through top_k
+
+        let mut batch_entry = self.heap.register_batch(batch);
+        for (index, row) in rows.iter().enumerate() {
+            match self.heap.max() {
+                // heap has k items, and the new row is greater than the
+                // current max in the heap ==> it is not a new topk
+                Some(max_row) if row.as_ref() >= max_row.row() => {}
+                // don't yet have k items or new item is lower than the currently k low values
+                None | Some(_) => {
+                    self.heap.add(&mut batch_entry, row, index);
+                    self.metrics.row_replacements.add(1);
+                }
+            }
+        }
+        self.heap.insert_batch_entry(batch_entry);
+
+        // conserve memory
+        self.heap.maybe_compact()?;
+
+        // update memory reservation
+        self.reservation.try_resize(self.size())?;
+        Ok(())
+    }
+
+    /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap
+    pub fn emit(self) -> Result<SendableRecordBatchStream> {
+        let Self {
+            schema,
+            metrics,
+            reservation: _,
+            batch_size,
+            expr: _,
+            row_converter: _,
+            scratch_rows: _,
+            mut heap,
+        } = self;
+        let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
+
+        let mut batch = heap.emit()?;
+        metrics.baseline.output_rows().add(batch.num_rows());
+
+        // break into record batches as needed
+        let mut batches = vec![];
+        loop {
+            if batch.num_rows() < batch_size {
+                batches.push(Ok(batch));
+                break;
+            } else {
+                batches.push(Ok(batch.slice(0, batch_size)));
+                let remaining_length = batch.num_rows() - batch_size;
+                batch = batch.slice(batch_size, remaining_length);
+            }
+        }
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            schema,
+            futures::stream::iter(batches),
+        )))
+    }
+
+    /// return the size of memory used by this operator, in bytes
+    fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.row_converter.size()
+            + self.scratch_rows.size()
+            + self.heap.size()
+    }
+}
+
+struct TopKMetrics {
+    /// metrics
+    pub baseline: BaselineMetrics,
+
+    /// count of how many rows were replaced in the heap
+    pub row_replacements: Count,
+}
+
+impl TopKMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            row_replacements: MetricBuilder::new(metrics)
+                .counter("row_replacements", partition),
+        }
+    }
+}
+
+/// This structure keeps at most the *smallest* k items, using the
+/// [arrow::row] format for sort keys. While it is called "topK" for
+/// values like `1, 2, 3, 4, 5` the "top 3" really means the
+/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`.
+///
+/// Using the `Row` format handles things such as ascending vs
+/// descending and nulls first vs nulls last.
+///
+/// It doesn't use `BinaryHeap` in the Rust standard library because
+/// it is important to check the current minimum value in the heap
+/// prior to creating a new value to insert.
+struct TopKHeap {
+    /// The maximum number of elemenents to store in this heap.
+    k: usize,
+    /// The target number of rows for output batches
+    batch_size: usize,
+    /// Storage for up at most `k` items using a BinaryHeap. Reverserd
+    /// so that the smallest k so far is on the top
+    inner: BinaryHeap<TopKRow>,
+    /// Storage the original row values (TopKRow only has the sort key)
+    store: RecordBatchStore,
+    /// The size of all owned data held by this heap
+    owned_bytes: usize,
+}
+
+impl TopKHeap {
+    fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self {
+        assert!(k > 0);
+        Self {
+            k,
+            batch_size,
+            inner: BinaryHeap::new(),
+            store: RecordBatchStore::new(schema),
+            owned_bytes: 0,
+        }
+    }
+
+    /// Register a [`RecordBatch`] with the heap, returning the
+    /// appropriate entry
+    pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry {
+        self.store.register(batch)
+    }
+
+    /// Insert a [`RecordBatchEntry`] created by a previous call to
+    /// [`Self::register_batch`] into storage.
+    pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) {
+        self.store.insert(entry)
+    }
+
+    /// Returns the largest value stored by the heap if there are k
+    /// items, otherwise returns None. Remember this structure is
+    /// keeping the "smallest" k values
+    fn max(&self) -> Option<&TopKRow> {
+        if self.inner.len() < self.k {
+            None
+        } else {
+            self.inner.peek()
+        }
+    }
+
+    /// Adds `row` to this heap. If inserting this new item would
+    /// increase the size past `k`, removes the previously smallest
+    /// item.
+    fn add(
+        &mut self,
+        batch_entry: &mut RecordBatchEntry,
+        row: impl AsRef<[u8]>,
+        index: usize,
+    ) {
+        let batch_id = batch_entry.id;
+        batch_entry.uses += 1;
+
+        assert!(self.inner.len() <= self.k);
+        let row = row.as_ref();
+
+        // Reuse storage for evicted item if possible
+        let new_top_k = if self.inner.len() == self.k {
+            let prev_min = self.inner.pop().unwrap();
+
+            // Update batch use
+            if prev_min.batch_id == batch_entry.id {
+                batch_entry.uses -= 1;
+            } else {
+                self.store.unuse(prev_min.batch_id);
+            }
+
+            // update memory accounting
+            self.owned_bytes -= prev_min.owned_size();
+            prev_min.with_new_row(row, batch_id, index)
+        } else {
+            TopKRow::new(row, batch_id, index)
+        };
+
+        self.owned_bytes += new_top_k.owned_size();
+
+        // put the new row into the heap
+        self.inner.push(new_top_k)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], resetting the inner heap
+    pub fn emit(&mut self) -> Result<RecordBatch> {
+        Ok(self.emit_with_state()?.0)
+    }
+
+    /// Returns the values stored in this heap, from values low to
+    /// high, as a single [`RecordBatch`], and a sorted vec of the
+    /// current heap's contents
+    pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> {
+        let schema = self.store.schema().clone();
+
+        // generate sorted rows
+        let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec();
+
+        if self.store.is_empty() {
+            return Ok((RecordBatch::new_empty(schema), topk_rows));
+        }
+
+        // Indices for each row within its respective RecordBatch
+        let indices: Vec<_> = topk_rows
+            .iter()
+            .enumerate()
+            .map(|(i, k)| (i, k.index))
+            .collect();
+
+        let num_columns = schema.fields().len();
+
+        // build the output columns one at time, using the
+        // `interleave` kernel to pick rows from different arrays
+        let output_columns: Vec<_> = (0..num_columns)
+            .map(|col| {
+                let input_arrays: Vec<_> = topk_rows
+                    .iter()
+                    .map(|k| {
+                        let entry =
+                            self.store.get(k.batch_id).expect("invalid stored batch id");
+                        entry.batch.column(col) as &dyn Array
+                    })
+                    .collect();
+
+                // at this point `indices` contains indexes within the
+                // rows and `input_arrays` contains a reference to the
+                // relevant Array for that index. `interleave` pulls
+                // them together into a single new array
+                Ok(interleave(&input_arrays, &indices)?)
+            })
+            .collect::<Result<_>>()?;
+
+        let new_batch = RecordBatch::try_new(schema, output_columns)?;
+        Ok((new_batch, topk_rows))
+    }
+
+    /// Compact this heap, rewriting all stored batches into a single
+    /// input batch
+    pub fn maybe_compact(&mut self) -> Result<()> {
+        // we compact if the number of "unused" rows in the store is

Review Comment:
   @Dandandan  did you review this heuristic -- I remember I tried it on our high cardinality tracing usecase and it seemed to work well (basically it is needed for large N with random-ish inputs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org