You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/07/23 15:45:28 UTC

[impala] branch master updated (4502097 -> 6f99cab)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 4502097  IMPALA-9799: Add retries to TestFetchFirst get_num_in_flight_queries calls
     new d3115a5  IMPALA-9979: part 1: factor out Top-N heap.
     new 6f99cab  IMPALA-9929: Subquery error should throw AnalysisException

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/codegen/impala-ir.h                         |   2 +
 be/src/exec/topn-node-ir.cc                        |  47 ++++++----
 be/src/exec/topn-node.cc                           |  82 +++++++++++------
 be/src/exec/topn-node.h                            | 100 +++++++++++++++------
 be/src/util/tuple-row-compare.h                    |   3 +-
 .../org/apache/impala/analysis/SelectStmt.java     |   7 +-
 .../impala/analysis/AnalyzeSubqueriesTest.java     |   4 +
 7 files changed, 170 insertions(+), 75 deletions(-)


[impala] 02/02: IMPALA-9929: Subquery error should throw AnalysisException

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6f99cabdb9ab71d5c022b95716a276b04b5ba132
Author: Shant Hovsepian <sh...@cloudera.com>
AuthorDate: Sat Jul 18 08:38:36 2020 -0400

    IMPALA-9929: Subquery error should throw AnalysisException
    
    Unsupported subquery in the select list should throw an
    AnalysisException.
    
    Testing:
    * Analyzer test to catch this case.
    
    Change-Id: Ic299ea25fd6e505e364528891e737a9af5bcc338
    Reviewed-on: http://gerrit.cloudera.org:8080/16212
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/java/org/apache/impala/analysis/SelectStmt.java        | 7 ++++---
 .../java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java     | 4 ++++
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 2fca33d..3d520f3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -326,9 +326,10 @@ public class SelectStmt extends QueryStmt {
                   "A subquery which may return more than one row is not supported in "
                   + "the expression: " + item.getExpr().toSql());
             }
-            Preconditions.checkState(((SelectStmt)s.getStatement()).returnsAtMostOneRow(),
-                "Invariant violated: Only subqueries that are guaranteed to return a "
-                    + "single row are supported: " + item.getExpr().toSql());
+            if (!((SelectStmt)s.getStatement()).returnsAtMostOneRow()) {
+              throw new AnalysisException("Only subqueries that are guaranteed to return "
+                   + "a single row are supported: " + item.getExpr().toSql());
+            }
           }
           resultExprs_.add(item.getExpr());
           String label = item.toColumnLabel(i, analyzer_.useHiveColLabels());
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index e4be4d0..c490450 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -1387,6 +1387,10 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
             + "id=a.id ) from functional.alltypes a",
         "A correlated scalar subquery is not supported in the expression: "
             + "(SELECT count(*) FROM functional.alltypestiny b WHERE id = a.id)");
+    AnalysisError("select id, min(id) in (select id from functional.alltypestiny) "
+            + "from functional.alltypes a",
+        "Only subqueries that are guaranteed to return a single row are supported: "
+            + "min(id) IN (SELECT id FROM functional.alltypestiny)");
   }
 
   @Test


[impala] 01/02: IMPALA-9979: part 1: factor out Top-N heap.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d3115a561d257ffba84e61664312b7150b8c888c
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Jul 17 12:37:58 2020 -0700

    IMPALA-9979: part 1: factor out Top-N heap.
    
    This extracts the implementation of the actual
    priority queue from the rest of TopNNode's state,
    so that we can, in the next patch, have multiple
    heaps per node.
    
    The codegen'd InsertBatch() function is unfortunately
    a little sensitive to minor changes in code, because
    of the weird way that it does an indirect call via
    TupleRowComparator - see IMPALA-4065. I had to
    tweak the code a little to find a variant that performed
    similarly to the previous version - other variants had
    small regressions.
    
    Perf:
    Single node TPC-H showed no perf change.
    
    The time for the TOP-N node in this targeted query was
    within the margin of error:
    
    use tpch30_parquet;
    set mt_dop=1;
    select l_extendedprice from lineitem
    order by 1 limit 100
    
    Change-Id: I1f585216b547af7a470e02f75458b1901dc44a31
    Reviewed-on: http://gerrit.cloudera.org:8080/16223
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/codegen/impala-ir.h      |   2 +
 be/src/exec/topn-node-ir.cc     |  47 +++++++++++--------
 be/src/exec/topn-node.cc        |  82 ++++++++++++++++++++++----------
 be/src/exec/topn-node.h         | 100 +++++++++++++++++++++++++++++-----------
 be/src/util/tuple-row-compare.h |   3 +-
 5 files changed, 162 insertions(+), 72 deletions(-)

diff --git a/be/src/codegen/impala-ir.h b/be/src/codegen/impala-ir.h
index e1330ee..f0fa306 100644
--- a/be/src/codegen/impala-ir.h
+++ b/be/src/codegen/impala-ir.h
@@ -29,8 +29,10 @@
 /// best at that optimization setting.
 #define IR_NO_INLINE __attribute__((noinline))
 #define IR_ALWAYS_INLINE __attribute__((always_inline))
+#define IR_LIKELY(expr) __builtin_expect(!!(expr), 1)
 #else
 #define IR_NO_INLINE
 #define IR_ALWAYS_INLINE
+#define IR_LIKELY(expr) expr
 #endif
 
diff --git a/be/src/exec/topn-node-ir.cc b/be/src/exec/topn-node-ir.cc
index 590a4c8..e104fe4 100644
--- a/be/src/exec/topn-node-ir.cc
+++ b/be/src/exec/topn-node-ir.cc
@@ -20,38 +20,47 @@
 using namespace impala;
 
 void TopNNode::InsertBatch(RowBatch* batch) {
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    InsertTupleRow(batch->GetRow(i));
+  // TODO: after inlining the comparator calls with codegen - IMPALA-4065 - we could
+  // probably squeeze more performance out of this loop by ensure that as many loads
+  // are hoisted out of the loop as possible (either via code changes or __restrict__)
+  // annotations.
+  FOREACH_ROW(batch, 0, iter) {
+    bool replaced_existing_row = heap_->InsertTupleRow(this, iter.Get());
+    if (replaced_existing_row) ++rows_to_reclaim_;
   }
 }
 
-// Insert if either not at the limit or it's a new TopN tuple_row
-void TopNNode::InsertTupleRow(TupleRow* input_row) {
+bool TopNNode::Heap::InsertTupleRow(TopNNode* node, TupleRow* input_row) {
+  const TupleDescriptor& tuple_desc = *node->output_tuple_desc_;
+  bool replaced_existing_row = false;
   Tuple* insert_tuple = nullptr;
-
-  if (priority_queue_.size() < limit_ + offset_) {
+  if (priority_queue_.size() < heap_capacity()) {
+    // Add all tuples until we hit capacity.
     insert_tuple = reinterpret_cast<Tuple*>(
-        tuple_pool_->Allocate(output_tuple_desc_->byte_size()));
-    insert_tuple->MaterializeExprs<false, false>(input_row, *output_tuple_desc_,
-        output_tuple_expr_evals_, tuple_pool_.get());
+        node->tuple_pool_->Allocate(node->tuple_byte_size()));
+    insert_tuple->MaterializeExprs<false, false>(input_row, tuple_desc,
+        node->output_tuple_expr_evals_, node->tuple_pool_.get());
   } else {
+    // We're at capacity - compare to the first row in the priority queue to see if
+    // we need to insert this row into the queue.
     DCHECK(!priority_queue_.empty());
     Tuple* top_tuple = priority_queue_.front();
-    tmp_tuple_->MaterializeExprs<false, true>(input_row, *output_tuple_desc_,
-        output_tuple_expr_evals_, nullptr);
-    if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) {
-      // TODO: DeepCopy() will allocate new buffers for the string data. This needs
-      // to be fixed to use a freelist
-      tmp_tuple_->DeepCopy(top_tuple, *output_tuple_desc_, tuple_pool_.get());
+    node->tmp_tuple_->MaterializeExprs<false, true>(input_row, tuple_desc,
+        node->output_tuple_expr_evals_, nullptr);
+    if (node->tuple_row_less_than_->Less(node->tmp_tuple_, top_tuple)) {
+      // Pop off the old head, and replace with the new tuple. Deep copy into 'top_tuple'
+      // to reuse the fixed-length memory of 'top_tuple'.
+      node->tmp_tuple_->DeepCopy(top_tuple, tuple_desc, node->tuple_pool_.get());
       insert_tuple = top_tuple;
       PopHeap(&priority_queue_,
-          ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_));
-      rows_to_reclaim_++;
+          ComparatorWrapper<TupleRowComparator>(*node->tuple_row_less_than_));
+      replaced_existing_row = true;
     }
   }
-
   if (insert_tuple != nullptr) {
     PushHeap(&priority_queue_,
-        ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_), insert_tuple);
+        ComparatorWrapper<TupleRowComparator>(*node->tuple_row_less_than_),
+        insert_tuple);
   }
+  return replaced_existing_row;
 }
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 406ba3a..ad2f11e 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -39,7 +39,6 @@
 
 #include "common/names.h"
 
-using std::priority_queue;
 using namespace impala;
 
 Status TopNPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
@@ -73,7 +72,7 @@ Status TopNPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const
 TopNNode::TopNNode(
     ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
-    offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
+    offset_(pnode.offset()),
     output_tuple_exprs_(pnode.output_tuple_exprs_),
     output_tuple_desc_(pnode.output_tuple_desc_),
     tuple_row_less_than_(new TupleRowLexicalComparator(*pnode.row_comparator_config_)),
@@ -134,6 +133,15 @@ void TopNPlanNode::Codegen(FragmentState* state) {
             materialize_exprs_no_pool_fn, Tuple::MATERIALIZE_EXPRS_NULL_POOL_SYMBOL);
         DCHECK_REPLACE_COUNT(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
 
+        replaced = codegen->ReplaceCallSitesWithValue(insert_batch_fn,
+            codegen->GetI64Constant(tnode_->limit + offset()), "heap_capacity");
+        DCHECK_REPLACE_COUNT(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
+
+        int tuple_byte_size = output_tuple_desc_->byte_size();
+        replaced = codegen->ReplaceCallSitesWithValue(insert_batch_fn,
+            codegen->GetI32Constant(tuple_byte_size), "tuple_byte_size");
+        DCHECK_REPLACE_COUNT(replaced, 1);
+
         insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
         DCHECK(insert_batch_fn != NULL);
         codegen->AddFunctionToJit(insert_batch_fn, &codegend_insert_batch_fn_);
@@ -153,6 +161,8 @@ Status TopNNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
+  heap_.reset(new Heap(limit_ + offset_));
+
   // Allocate memory for a temporary tuple.
   tmp_tuple_ = reinterpret_cast<Tuple*>(
       tuple_pool_->Allocate(output_tuple_desc_->byte_size()));
@@ -184,7 +194,8 @@ Status TopNNode::Open(RuntimeState* state) {
       RETURN_IF_ERROR(QueryMaintenance(state));
     } while (!eos);
   }
-  DCHECK_LE(priority_queue_.size(), limit_ + offset_);
+  // TODO: this wouldn't be valid for partitioned.
+  DCHECK_LE(heap_->num_tuples(), limit_ + offset_);
   PrepareForOutput();
 
   // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
@@ -226,7 +237,7 @@ Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
 }
 
 Status TopNNode::Reset(RuntimeState* state, RowBatch* row_batch) {
-  priority_queue_.clear();
+  heap_->Reset();
   num_rows_skipped_ = 0;
   // Transfer ownership of tuple data to output batch.
   row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
@@ -237,42 +248,37 @@ Status TopNNode::Reset(RuntimeState* state, RowBatch* row_batch) {
 
 void TopNNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  if (heap_ != nullptr) heap_->Close();
   if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
   if (tuple_row_less_than_.get() != nullptr) tuple_row_less_than_->Close(state);
   ScalarExprEvaluator::Close(output_tuple_expr_evals_, state);
   ExecNode::Close(state);
 }
 
-// Reverse the order of the tuples in the priority queue
 void TopNNode::PrepareForOutput() {
-  sorted_top_n_.resize(priority_queue_.size());
-  int64_t index = sorted_top_n_.size() - 1;
+  // TODO: this will need to iterate through partitions.
+  heap_->PrepareForOutput(*this, &sorted_top_n_);
+  get_next_iter_ = sorted_top_n_.begin();
+}
+
+void TopNNode::Heap::PrepareForOutput(
+    const TopNNode& RESTRICT node, vector<Tuple*>* sorted_top_n) RESTRICT {
+  // Reverse the order of the tuples in the priority queue
+  sorted_top_n->resize(priority_queue_.size());
+  int64_t index = sorted_top_n->size() - 1;
 
+  ComparatorWrapper<TupleRowComparator> cmp(*node.tuple_row_less_than_);
   while (priority_queue_.size() > 0) {
     Tuple* tuple = priority_queue_.front();
-    PopHeap(&priority_queue_,
-        ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_));
-    sorted_top_n_[index] = tuple;
+    PopHeap(&priority_queue_, cmp);
+    (*sorted_top_n)[index] = tuple;
     --index;
   }
-
-  get_next_iter_ = sorted_top_n_.begin();
 }
 
 Status TopNNode::ReclaimTuplePool(RuntimeState* state) {
   unique_ptr<MemPool> temp_pool(new MemPool(mem_tracker()));
-
-  for (int i = 0; i < priority_queue_.size(); i++) {
-    Tuple* insert_tuple = reinterpret_cast<Tuple*>(temp_pool->TryAllocate(
-        output_tuple_desc_->byte_size()));
-    if (UNLIKELY(insert_tuple == nullptr)) {
-      return temp_pool->mem_tracker()->MemLimitExceeded(state,
-          "Failed to allocate memory in TopNNode::ReclaimTuplePool.",
-          output_tuple_desc_->byte_size());
-    }
-    priority_queue_[i]->DeepCopy(insert_tuple, *output_tuple_desc_, temp_pool.get());
-    priority_queue_[i] = insert_tuple;
-  }
+  RETURN_IF_ERROR(heap_->RematerializeTuples(this, state, temp_pool.get()));
 
   rows_to_reclaim_ = 0;
   tmp_tuple_ = reinterpret_cast<Tuple*>(temp_pool->TryAllocate(
@@ -283,7 +289,7 @@ Status TopNNode::ReclaimTuplePool(RuntimeState* state) {
         output_tuple_desc_->byte_size());
   }
   tuple_pool_->FreeAll();
-  tuple_pool_.reset(temp_pool.release());
+  tuple_pool_ = move(temp_pool);
   return Status::OK();
 }
 
@@ -301,3 +307,29 @@ void TopNNode::DebugString(int indentation_level, stringstream* out) const {
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
+
+TopNNode::Heap::Heap(int64_t capacity) : capacity_(capacity) {}
+
+void TopNNode::Heap::Reset() {
+  priority_queue_.clear();
+}
+
+void TopNNode::Heap::Close() {
+  priority_queue_.clear();
+}
+
+Status TopNNode::Heap::RematerializeTuples(TopNNode* node,
+    RuntimeState* state, MemPool* new_pool) {
+  const TupleDescriptor& tuple_desc = *node->output_tuple_desc_;
+  int tuple_size = tuple_desc.byte_size();
+  for (int i = 0; i < priority_queue_.size(); i++) {
+    Tuple* insert_tuple = reinterpret_cast<Tuple*>(new_pool->TryAllocate(tuple_size));
+    if (UNLIKELY(insert_tuple == nullptr)) {
+      return new_pool->mem_tracker()->MemLimitExceeded(state,
+          "Failed to allocate memory in TopNNode::ReclaimTuplePool.", tuple_size);
+    }
+    priority_queue_[i]->DeepCopy(insert_tuple, tuple_desc, new_pool);
+    priority_queue_[i] = insert_tuple;
+  }
+  return Status::OK();
+}
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index 8ea6a0b..bb06851 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
 
-#ifndef IMPALA_EXEC_TOPN_NODE_H
-#define IMPALA_EXEC_TOPN_NODE_H
-
+#include <memory>
 #include <queue>
-#include <boost/scoped_ptr.hpp>
 
 #include "codegen/codegen-fn-ptr.h"
 #include "codegen/impala-ir.h"
@@ -42,6 +40,9 @@ class TopNPlanNode : public PlanNode {
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   virtual void Codegen(FragmentState* state) override;
 
+  int64_t offset() const {
+    return tnode_->sort_node.__isset.offset ? tnode_->sort_node.offset : 0;
+  }
   ~TopNPlanNode(){}
 
   /// Ordering expressions used for tuple comparison.
@@ -81,35 +82,23 @@ class TopNNode : public ExecNode {
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
+  class Heap;
 
   friend class TupleLessThan;
 
   /// Inserts all the rows in 'batch' into the queue.
   void InsertBatch(RowBatch* batch);
 
-  /// Inserts a tuple row into the priority queue if it's in the TopN.  Creates a deep
-  /// copy of tuple_row, which it stores in tuple_pool_.
-  void IR_ALWAYS_INLINE InsertTupleRow(TupleRow* tuple_row);
-
-  /// Flatten and reverse the priority queue.
+  /// Prepare to output all of the rows in this operator in sorted order. Initializes
+  /// 'sorted_top_n_' and 'get_next_iterator_'.
   void PrepareForOutput();
 
-  // Re-materialize the elements in the priority queue into a new tuple pool, and release
-  // the previous pool.
+  // Re-materialize all tuples that reference 'tuple_pool_' and release 'tuple_pool_',
+  // replacing it with a new pool.
   Status ReclaimTuplePool(RuntimeState* state);
 
-  /// Helper methods for modifying priority_queue while maintaining ordered heap
-  /// invariants
-  inline static void PushHeap(std::vector<Tuple*>* priority_queue,
-      const ComparatorWrapper<TupleRowComparator>& comparator, Tuple* const insert_row) {
-    priority_queue->push_back(insert_row);
-    std::push_heap(priority_queue->begin(), priority_queue->end(), comparator);
-  }
-
-  inline static void PopHeap(std::vector<Tuple*>* priority_queue,
-      const ComparatorWrapper<TupleRowComparator>& comparator) {
-    std::pop_heap(priority_queue->begin(), priority_queue->end(), comparator);
-    priority_queue->pop_back();
+  IR_NO_INLINE int tuple_byte_size() const {
+    return output_tuple_desc_->byte_size();
   }
 
   /// Number of rows to skip.
@@ -123,7 +112,7 @@ class TopNNode : public ExecNode {
   TupleDescriptor* const output_tuple_desc_;
 
   /// Comparator for priority_queue_.
-  boost::scoped_ptr<TupleRowComparator> tuple_row_less_than_;
+  std::unique_ptr<TupleRowComparator> tuple_row_less_than_;
 
   /// After computing the TopN in the priority_queue, pop them and put them in this vector
   std::vector<Tuple*> sorted_top_n_;
@@ -134,7 +123,7 @@ class TopNNode : public ExecNode {
   Tuple* tmp_tuple_ = nullptr;
 
   /// Stores everything referenced in priority_queue_.
-  boost::scoped_ptr<MemPool> tuple_pool_;
+  std::unique_ptr<MemPool> tuple_pool_;
 
   /// Iterator over elements in sorted_top_n_.
   std::vector<Tuple*>::iterator get_next_iter_;
@@ -155,9 +144,54 @@ class TopNNode : public ExecNode {
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
+  /// A heap containing up to 'limit_' + 'offset_' rows.
+  std::unique_ptr<Heap> heap_;
+
   /// Number of rows skipped. Used for adhering to offset_.
   int64_t num_rows_skipped_;
 
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+};
+
+/// This is the main data structure used for in-memory Top-N: a binary heap containing
+/// up to 'capacity' tuples.
+class TopNNode::Heap {
+ public:
+  Heap(int64_t capacity);
+
+  void Reset();
+  void Close();
+
+  /// Inserts a tuple row into the priority queue if it's in the TopN.  Creates a deep
+  /// copy of 'tuple_row', which it stores in 'tuple_pool'. Always inlined in IR into
+  /// TopNNode::InsertBatch() because codegen relies on this for substituting exprs
+  /// in the body of TopNNode.
+  /// Returns true if a previous row was replaced.
+  bool IR_ALWAYS_INLINE InsertTupleRow(TopNNode* node, TupleRow* input_row);
+
+  /// Copy the elements in the priority queue into a new tuple pool, and release
+  /// the previous pool.
+  Status RematerializeTuples(TopNNode* node, RuntimeState* state, MemPool* new_pool);
+
+  /// Put the tuples in the priority queue into 'sorted_top_n' in the correct order
+  /// for output.
+  void PrepareForOutput(
+      const TopNNode& RESTRICT node, std::vector<Tuple*>* sorted_top_n) RESTRICT;
+
+  /// Returns number of tuples currently in heap.
+  int64_t num_tuples() const { return priority_queue_.size(); }
+
+  IR_NO_INLINE int64_t heap_capacity() const { return capacity_; }
+
+ private:
+  /// Limit on capacity of 'priority_queue_'. If inserting a tuple into the queue
+  /// would exceed this, a tuple is popped off the queue.
+  const int64_t capacity_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
   /// The priority queue (represented by a vector and modified using
   /// push_heap()/pop_heap() to maintain ordered heap invariants) will never have more
   /// elements in it than the LIMIT + OFFSET. The order of the queue is the opposite of
@@ -167,8 +201,20 @@ class TopNNode : public ExecNode {
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
-};
 
+  /// Helper methods for modifying priority_queue while maintaining ordered heap
+  /// invariants
+  inline static void PushHeap(std::vector<Tuple*>* priority_queue,
+      const ComparatorWrapper<TupleRowComparator>& comparator, Tuple* const insert_row) {
+    priority_queue->push_back(insert_row);
+    std::push_heap(priority_queue->begin(), priority_queue->end(), comparator);
+  }
+
+  inline static void PopHeap(std::vector<Tuple*>* priority_queue,
+      const ComparatorWrapper<TupleRowComparator>& comparator) {
+    std::pop_heap(priority_queue->begin(), priority_queue->end(), comparator);
+    priority_queue->pop_back();
+  }
 };
 
-#endif
+}; // namespace impala
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 7dc0a23..ce2ddcb 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -20,6 +20,7 @@
 #define IMPALA_UTIL_TUPLE_ROW_COMPARE_H_
 
 #include "codegen/codegen-fn-ptr.h"
+#include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
@@ -127,7 +128,7 @@ class TupleRowComparator {
   int ALWAYS_INLINE Compare(const TupleRow* lhs, const TupleRow* rhs) const {
     const TupleRowComparatorConfig::CompareFn codegend_compare_fn =
         codegend_compare_fn_.load();
-    if (codegend_compare_fn != nullptr) {
+    if (IR_LIKELY(codegend_compare_fn != nullptr)) {
       return codegend_compare_fn(ordering_expr_evals_lhs_.data(),
           ordering_expr_evals_rhs_.data(), lhs, rhs);
     }