You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/03/02 00:21:32 UTC

[impala] 01/03: IMPALA-11477: Adding Codegen to sorted-run-merger

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 939a6ae14e5416845a43d3d41d01c43e8123a0b9
Author: noemi <np...@cloudera.com>
AuthorDate: Mon Jun 20 15:08:46 2022 +0200

    IMPALA-11477: Adding Codegen to sorted-run-merger
    
    SortedRunMerger is used to merge multiple, already sorted runs.
    It is used for external merge in the sorter (SortNode), and in
    KRPC data stream receiver (ExchangeNode).
    
    SortedRunMerger builds and maintains a min heap of the sorted input
    runs. Rewrote SortedRunMerger::Heapify from recursive to iterative
    and moved to a separate new source file: sorted-run-merger-ir.cc.
    Added a static Codegen() to SortedRunMerger and call it from the
    corresponding ExecNodes: SortNode and ExchangeNode.
    
    This change lets the merger use the codegened version of
    TupleRowComparator instead of the interpreted one, which can increase
    the speed, especially in case of complex comparison expressions.
    This change also serves as a base for further codegen-related
    optimizations in the merger.
    
    Testing:
     - run existing E2E sort tests (test-sort.py)
     - manual testing: run queries that instantiate sort nodes and
       merging exchange nodes
    Benchmarking:
     - did not cause regression on TPCH query set
     - made merge-intensive queries and IMPALA-4530 (in-memory merge of
       quicksorted small runs) faster
    
    Change-Id: Ic35c7460bdbd54b8ec5872a83680e2f41ceae9fd
    Reviewed-on: http://gerrit.cloudera.org:8080/18824
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/codegen/gen_ir_descriptions.py    |   4 +-
 be/src/codegen/impala-ir.cc              |   1 +
 be/src/exec/exchange-node.cc             |  13 ++-
 be/src/exec/exchange-node.h              |   5 +-
 be/src/exec/sort-node.cc                 |   7 +-
 be/src/exec/sort-node.h                  |   5 +-
 be/src/runtime/CMakeLists.txt            |   1 +
 be/src/runtime/data-stream-test.cc       |   6 +-
 be/src/runtime/krpc-data-stream-recvr.cc |   6 +-
 be/src/runtime/krpc-data-stream-recvr.h  |   4 +-
 be/src/runtime/sorted-run-merger-ir.cc   |  52 ++++++++++
 be/src/runtime/sorted-run-merger.cc      | 170 ++++++++++++++-----------------
 be/src/runtime/sorted-run-merger.h       |  64 +++++++++++-
 be/src/runtime/sorter.cc                 |   6 +-
 be/src/runtime/sorter.h                  |  14 ++-
 15 files changed, 253 insertions(+), 105 deletions(-)

diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 3ed2935f4..72c23adee 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -259,8 +259,10 @@ ir_functions = [
    "_ZN6impala14WriteKuduValueEiRKNS_10ColumnTypeEPKvbPN4kudu14KuduPartialRowE"],
   ["GET_KUDU_PARTITION_ROW",
    "_ZN6impala19GetKuduPartitionRowEPN4kudu6client15KuduPartitionerEPNS0_14KuduPartialRowE"],
-   ["TUPLE_SORTER_SORT_HELPER",
+  ["TUPLE_SORTER_SORT_HELPER",
    "_ZN6impala6Sorter11TupleSorter10SortHelperENS0_13TupleIteratorES2_"],
+  ["SORTED_RUN_MERGER_HEAPIFY_HELPER",
+   "_ZN6impala15SortedRunMerger13HeapifyHelperEi"]
 ]
 
 enums_preamble = '\
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 4aae36534..10ce8eb16 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -69,6 +69,7 @@
 #include "runtime/runtime-filter-ir.cc"
 #include "runtime/sorter-ir.cc"
 #include "runtime/tuple-ir.cc"
+#include "runtime/sorted-run-merger-ir.cc"
 #include "udf/udf-ir.cc"
 #include "util/bloom-filter-ir.cc"
 #include "util/hash-util-ir.cc"
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 9a2894d6d..b605ba0a7 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -121,7 +121,14 @@ void ExchangePlanNode::Codegen(FragmentState* state) {
   if (IsNodeCodegenDisabled()) return;
 
   if (row_comparator_config_ != nullptr) {
-    AddCodegenStatus(row_comparator_config_->Codegen(state));
+    Status codegen_status;
+    llvm::Function* compare_fn = nullptr;
+    codegen_status = row_comparator_config_->Codegen(state, &compare_fn);
+    if (codegen_status.ok()) {
+      codegen_status =
+          SortedRunMerger::Codegen(state, compare_fn, &codegend_heapify_helper_fn_);
+    }
+    AddCodegenStatus(codegen_status);
   }
 }
 
@@ -131,11 +138,13 @@ Status ExchangeNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
   RETURN_IF_CANCELLED(state);
   if (is_merging_) {
+    const ExchangePlanNode& pnode = static_cast<const ExchangePlanNode&>(plan_node_);
     // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().
     RETURN_IF_ERROR(
         less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool()));
-    RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
+    RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get(),
+        pnode.codegend_heapify_helper_fn_));
   } else {
     RETURN_IF_ERROR(FillInputRowBatch(state));
   }
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index ab5c3c5da..6a6c64196 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -21,7 +21,7 @@
 
 #include <boost/scoped_ptr.hpp>
 #include "exec/exec-node.h"
-
+#include "runtime/sorted-run-merger.h"
 #include "runtime/bufferpool/buffer-pool.h"
 
 namespace impala {
@@ -46,6 +46,9 @@ class ExchangePlanNode : public PlanNode {
 
   /// Config used to create a TupleRowComparator instance. Non null for merging exchange.
   TupleRowComparatorConfig* row_comparator_config_ = nullptr;
+
+  /// Codegened version of SortedRunMerger::HeapifyHelper().
+  CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> codegend_heapify_helper_fn_;
 };
 
 /// Receiver node for data streams. The data stream receiver is created in Prepare()
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 223b689a9..d93b5a39d 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -81,7 +81,8 @@ Status SortNode::Prepare(RuntimeState* state) {
   sorter_.reset(new Sorter(tuple_row_comparator_config_, sort_tuple_exprs_,
       &row_descriptor_, mem_tracker(), buffer_pool_client(),
       resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true,
-      pnode.codegend_sort_helper_fn_, ComputeInputSizeEstimate()));
+      pnode.codegend_sort_helper_fn_, pnode.codegend_heapify_helper_fn_,
+      ComputeInputSizeEstimate()));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   return Status::OK();
@@ -115,6 +116,10 @@ void SortPlanNode::Codegen(FragmentState* state) {
             row_descriptor_->tuple_descriptors()[0]->byte_size(),
             &codegend_sort_helper_fn_);
   }
+  if (codegen_status.ok()) {
+    codegen_status =
+        SortedRunMerger::Codegen(state, compare_fn, &codegend_heapify_helper_fn_);
+  }
   AddCodegenStatus(codegen_status);
 }
 
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 42ec77291..393511d49 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -45,8 +45,11 @@ class SortPlanNode : public PlanNode {
   /// Number of backend nodes executing the same sort node plan.
   int32_t num_backends_;
 
-  /// Codegened version of Sort::TupelSorter::SortHelper().
+  /// Codegened version of Sorter::TupleSorter::SortHelper().
   CodegenFnPtr<Sorter::SortHelperFn> codegend_sort_helper_fn_;
+
+  /// Codegened version of SortedRunMerger::HeapifyHelper().
+  CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> codegend_heapify_helper_fn_;
 };
 
 /// Node that implements a full sort of its input with a fixed memory budget, spilling
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index b04e04eb1..7da63826e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -81,6 +81,7 @@ add_library(Runtime
   runtime-state.cc
   scanner-mem-limiter.cc
   sorted-run-merger.cc
+  sorted-run-merger-ir.cc
   sorter.cc
   spillable-row-batch-queue.cc
   string-value.cc
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index a167caf97..a41d3436b 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -449,7 +449,11 @@ class DataStreamTest : public testing::Test {
 
   void ReadStreamMerging(ReceiverInfo* info, RuntimeProfile* profile,
       TupleRowComparator* less_than_comparator) {
-    info->status = info->stream_recvr->CreateMerger(*less_than_comparator);
+    /// Note that codegend_heapify_helper_fn currently stores a nullptr.
+    /// The codegened case of this function is covered in end-to-end tests.
+    CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> codegend_heapify_helper_fn;
+    info->status = info->stream_recvr->CreateMerger(
+        *less_than_comparator, codegend_heapify_helper_fn);
     if (info->status.IsCancelled()) return;
     RowBatch batch(row_desc_, 1024, &tracker_);
     VLOG_QUERY << "start reading merging";
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 6fd42236f..eab53eaba 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -639,14 +639,16 @@ void KrpcDataStreamRecvr::SenderQueue::Close() {
   current_batch_.reset();
 }
 
-Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
+Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than,
+    const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn) {
   DCHECK(is_merging_);
   DCHECK(TestInfo::is_test() || FragmentInstanceState::IsFragmentExecThread());
   vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
   input_batch_suppliers.reserve(sender_queues_.size());
 
   // Create the merger that will a single stream of sorted rows.
-  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
+  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false,
+      codegend_heapify_helper_fn));
 
   for (SenderQueue* queue: sender_queues_) {
     input_batch_suppliers.push_back(
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 4d689e5c9..f77119769 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -27,6 +27,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
 #include "util/tuple-row-compare.h"
+#include "runtime/sorted-run-merger.h"
 
 namespace kudu {
 namespace rpc {
@@ -103,7 +104,8 @@ class KrpcDataStreamRecvr {
   /// the specified row comparator. Fetches the first batches from the individual sender
   /// queues. The exprs used in less_than must have already been prepared and opened.
   /// Called from fragment instance execution threads only.
-  Status CreateMerger(const TupleRowComparator& less_than);
+  Status CreateMerger(const TupleRowComparator& less_than,
+      const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn);
 
   /// Fill output_batch with the next batch of rows obtained by merging the per-sender
   /// input streams. Must only be called if is_merging_ is true. Called from fragment
diff --git a/be/src/runtime/sorted-run-merger-ir.cc b/be/src/runtime/sorted-run-merger-ir.cc
new file mode 100644
index 000000000..d212f02ed
--- /dev/null
+++ b/be/src/runtime/sorted-run-merger-ir.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/exec-env.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+#include "runtime/sorted-run-merger.h"
+
+namespace impala {
+
+void SortedRunMerger::HeapifyHelper(int parent_index) {
+  int left_index = 2 * parent_index + 1;
+  int right_index = left_index + 1;
+  int least_child;
+  while (left_index < min_heap_.size()) {
+    // Find the least child of parent.
+    if (right_index >= min_heap_.size() ||
+        comparator_.Less(min_heap_[left_index]->current_row(),
+                        min_heap_[right_index]->current_row())) {
+      least_child = left_index;
+    } else {
+      least_child = right_index;
+    }
+
+    // If the parent is out of place, swap it with the least child.
+    if (comparator_.Less(min_heap_[least_child]->current_row(),
+            min_heap_[parent_index]->current_row())) {
+      iter_swap(min_heap_.begin() + least_child, min_heap_.begin() + parent_index);
+      parent_index = least_child;
+    } else {
+      break;
+    }
+    left_index = 2 * parent_index + 1;
+    right_index = left_index + 1;
+  }
+}
+
+}
\ No newline at end of file
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 2331ab777..0d5d9c216 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -19,118 +19,78 @@
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
 #include "util/tuple-row-compare.h"
+#include "runtime/fragment-state.h"
+#include "codegen/llvm-codegen.h"
 
 #include "common/names.h"
 
 namespace impala {
 
-/// SortedRunWrapper returns individual rows in a batch obtained from a sorted input run
-/// (a RunBatchSupplierFn). Used as the heap element in the min heap maintained by the
-/// merger.
-/// Advance() advances the row supplier to the next row in the input batch and retrieves
-/// the next batch from the input if the current input batch is exhausted. Transfers
-/// ownership from the current input batch to an output batch if requested.
-class SortedRunMerger::SortedRunWrapper {
- public:
-  /// Construct an instance from a sorted input run.
-  SortedRunWrapper(SortedRunMerger* parent, const RunBatchSupplierFn& sorted_run)
-    : sorted_run_(sorted_run),
-      input_row_batch_(NULL),
-      input_row_batch_index_(-1),
-      parent_(parent) {
-  }
+SortedRunMerger::SortedRunWrapper::SortedRunWrapper(SortedRunMerger* parent,
+    const RunBatchSupplierFn& sorted_run)
+  : sorted_run_(sorted_run),
+    input_row_batch_(nullptr),
+    input_row_batch_index_(-1),
+    parent_(parent) {
+}
 
-  /// Retrieves the first batch of sorted rows from the run.
-  Status Init(bool* eos) {
-    *eos = false;
-    RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
-    if (input_row_batch_ == NULL) {
-      *eos = true;
-      return Status::OK();
-    }
-    RETURN_IF_ERROR(Advance(NULL, eos));
+Status SortedRunMerger::SortedRunWrapper::Init(bool* eos) {
+  *eos = false;
+  RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
+  if (input_row_batch_ == nullptr) {
+    *eos = true;
     return Status::OK();
   }
+  RETURN_IF_ERROR(Advance(nullptr, eos));
+  return Status::OK();
+}
 
-  /// Increment the current row index. If the current input batch is exhausted, fetch the
-  /// next one from the sorted run. Transfer ownership to transfer_batch if not NULL.
-  Status Advance(RowBatch* transfer_batch, bool* eos) {
-    DCHECK(input_row_batch_ != NULL);
-    ++input_row_batch_index_;
-    if (input_row_batch_index_ < input_row_batch_->num_rows()) {
-      *eos = false;
-      return Status::OK();
-    }
-
-    // Iterate until we hit eos or get a non-empty batch.
-    do {
-      // Make sure to transfer resources from every batch received from 'sorted_run_'.
-      if (transfer_batch != NULL) {
-        DCHECK_ENUM_EQ(
-            RowBatch::FlushMode::NO_FLUSH_RESOURCES, input_row_batch_->flush_mode())
-            << "Run batch suppliers that flush resources must use a deep-copying merger";
-        input_row_batch_->TransferResourceOwnership(transfer_batch);
-      }
-
-      {
-        ScopedTimer<MonotonicStopWatch> timer(parent_->get_next_batch_timer_);
-        RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
-      }
-    } while (input_row_batch_ != NULL && input_row_batch_->num_rows() == 0);
-
-    *eos = input_row_batch_ == NULL;
-    input_row_batch_index_ = 0;
+Status SortedRunMerger::SortedRunWrapper::Advance(RowBatch* transfer_batch, bool* eos) {
+  DCHECK(input_row_batch_ != nullptr);
+  ++input_row_batch_index_;
+  if (input_row_batch_index_ < input_row_batch_->num_rows()) {
+    *eos = false;
     return Status::OK();
   }
 
-  TupleRow* current_row() const {
-    return input_row_batch_->GetRow(input_row_batch_index_);
-  }
-
- private:
-  friend class SortedRunMerger;
-
-  /// The run from which this object supplies rows.
-  RunBatchSupplierFn sorted_run_;
-
-  /// The current input batch being processed.
-  RowBatch* input_row_batch_;
+  // Iterate until we hit eos or get a non-empty batch.
+  do {
+    // Make sure to transfer resources from every batch received from 'sorted_run_'.
+    if (transfer_batch != nullptr) {
+      DCHECK_ENUM_EQ(
+          RowBatch::FlushMode::NO_FLUSH_RESOURCES, input_row_batch_->flush_mode())
+          << "Run batch suppliers that flush resources must use a deep-copying merger";
+      input_row_batch_->TransferResourceOwnership(transfer_batch);
+    }
 
-  /// Index into input_row_batch_ of the current row being processed.
-  int input_row_batch_index_;
+    {
+      ScopedTimer<MonotonicStopWatch> timer(parent_->get_next_batch_timer_);
+      RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
+    }
+  } while (input_row_batch_ != nullptr && input_row_batch_->num_rows() == 0);
 
-  /// The parent merger instance.
-  SortedRunMerger* parent_;
-};
+  *eos = input_row_batch_ == nullptr;
+  input_row_batch_index_ = 0;
+  return Status::OK();
+}
 
 void SortedRunMerger::Heapify(int parent_index) {
-  int left_index = 2 * parent_index + 1;
-  int right_index = left_index + 1;
-  if (left_index >= min_heap_.size()) return;
-  int least_child;
-  // Find the least child of parent.
-  if (right_index >= min_heap_.size() ||
-      comparator_.Less(
-          min_heap_[left_index]->current_row(), min_heap_[right_index]->current_row())) {
-    least_child = left_index;
-  } else {
-    least_child = right_index;
-  }
+  const HeapifyHelperFn heapify_helper_fn = codegend_heapify_helper_fn_.load();
 
-  // If the parent is out of place, swap it with the least child and invoke
-  // Heapify recursively.
-  if (comparator_.Less(min_heap_[least_child]->current_row(),
-          min_heap_[parent_index]->current_row())) {
-    iter_swap(min_heap_.begin() + least_child, min_heap_.begin() + parent_index);
-    Heapify(least_child);
+  if (heapify_helper_fn != nullptr) {
+    heapify_helper_fn(this, parent_index);
+  } else {
+    HeapifyHelper(parent_index);
   }
 }
 
 SortedRunMerger::SortedRunMerger(const TupleRowComparator& comparator,
-    const RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input)
+    const RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input,
+    const CodegenFnPtr<HeapifyHelperFn>& codegend_heapify_helper_fn)
   : comparator_(comparator),
     input_row_desc_(row_desc),
-    deep_copy_input_(deep_copy_input) {
+    deep_copy_input_(deep_copy_input),
+    codegend_heapify_helper_fn_(codegend_heapify_helper_fn) {
   get_next_timer_ = ADD_TIMER(profile, "MergeGetNext");
   get_next_batch_timer_ = ADD_TIMER(profile, "MergeGetNextBatch");
 }
@@ -140,7 +100,7 @@ Status SortedRunMerger::Prepare(const vector<RunBatchSupplierFn>& input_runs) {
   min_heap_.reserve(input_runs.size());
   for (const RunBatchSupplierFn& input_run: input_runs) {
     SortedRunWrapper* new_elem = pool_.Add(new SortedRunWrapper(this, input_run));
-    DCHECK(new_elem != NULL);
+    DCHECK(new_elem != nullptr);
     bool empty;
     RETURN_IF_ERROR(new_elem->Init(&empty));
     if (!empty) min_heap_.push_back(new_elem);
@@ -182,14 +142,40 @@ Status SortedRunMerger::AdvanceMinRow(RowBatch* transfer_batch) {
   bool min_run_complete;
   // Advance to the next element in min. output_batch is supplied to transfer
   // resource ownership if the input batch in min is exhausted.
-  RETURN_IF_ERROR(min->Advance(deep_copy_input_ ? NULL : transfer_batch,
+  RETURN_IF_ERROR(min->Advance(deep_copy_input_ ? nullptr : transfer_batch,
       &min_run_complete));
   if (min_run_complete) {
     // Remove the element from the heap.
     iter_swap(min_heap_.begin(), min_heap_.end() - 1);
     min_heap_.pop_back();
   }
-  if (!min_heap_.empty()) Heapify(0);
+  if (!min_heap_.empty()) {
+    Heapify(0);
+  }
+  return Status::OK();
+}
+
+const char* SortedRunMerger::LLVM_CLASS_NAME = "class.impala::SortedRunMerger";
+
+Status SortedRunMerger::Codegen(FragmentState* state, llvm::Function* compare_fn,
+      CodegenFnPtr<HeapifyHelperFn>* codegend_fn) {
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+
+  llvm::Function* fn = codegen->GetFunction(
+      IRFunction::SORTED_RUN_MERGER_HEAPIFY_HELPER, true);
+  DCHECK(fn != nullptr);
+
+  int replaced =
+      codegen->ReplaceCallSites(fn, compare_fn, TupleRowComparator::COMPARE_SYMBOL);
+  DCHECK_REPLACE_COUNT(replaced, 2) << LlvmCodeGen::Print(fn);
+
+  fn = codegen->FinalizeFunction(fn);
+  if (fn == nullptr) {
+    return Status("SortedRunMerger::Codegen(): failed to finalize function");
+  }
+  codegen->AddFunctionToJit(fn, codegend_fn);
+
   return Status::OK();
 }
 
diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h
index 7834b9c39..ba3186248 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -23,6 +23,9 @@
 #include "common/object-pool.h"
 #include "util/runtime-profile.h"
 
+#include "runtime/row-batch.h"
+#include "util/tuple-row-compare.h"
+
 namespace impala {
 
 class RowBatch;
@@ -53,9 +56,11 @@ class SortedRunMerger {
   /// batch being returned. The returned batch can have any number of rows (including
   /// zero).
   typedef boost::function<Status (RowBatch**)> RunBatchSupplierFn;
+  typedef void (*HeapifyHelperFn)(SortedRunMerger*, int);
 
   SortedRunMerger(const TupleRowComparator& comparator, const RowDescriptor* row_desc,
-      RuntimeProfile* profile, bool deep_copy_input);
+      RuntimeProfile* profile, bool deep_copy_input,
+      const CodegenFnPtr<HeapifyHelperFn>& codegend_heapify_helper_fn);
 
   /// Prepare this merger to merge and return rows from the sorted runs in 'input_runs'.
   /// Retrieves the first batch from each run and sets up the binary heap implementing
@@ -65,6 +70,18 @@ class SortedRunMerger {
   /// Return the next batch of sorted rows from this merger.
   Status GetNext(RowBatch* output_batch, bool* eos);
 
+  /// Makes an attempt to codegen for method HeapifyHelper(). Stores the resulting
+  /// function in codegend_fn and returns Status::OK() if codegen was successful.
+  /// Otherwise, a Status("SortedRunMergerer::Codegen(): failed to finalize function")
+  /// object is returned.
+  /// 'compare_fn' is the pointer to the codegen version of the compare method with
+  /// which to replace all non-codegen versions.
+  static Status Codegen(FragmentState* state, llvm::Function* compare_fn,
+      CodegenFnPtr<HeapifyHelperFn>* codegend_fn);
+
+  /// Class name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
+
  private:
   class SortedRunWrapper;
 
@@ -81,6 +98,8 @@ class SortedRunMerger {
   /// restore the heap property (i.e. swap elements so parent <= children).
   void Heapify(int parent_index);
 
+  void HeapifyHelper(int parent_index);
+
   /// The binary min-heap used to merge rows from the sorted input runs. Since the heap is
   /// stored in a 0-indexed array, the 0-th element is the minimum element in the heap,
   /// and the children of the element at index i are 2*i+1 and 2*i+2. The heap property is
@@ -108,5 +127,48 @@ class SortedRunMerger {
 
   /// Times calls to get the next batch of rows from the input run.
   RuntimeProfile::Counter* get_next_batch_timer_;
+
+  /// A reference to the codegened version of SortedRunMerger::HeapifyHelper() that is
+  /// stored inside SortPlanNode and ExchangePlanNode.
+  const CodegenFnPtr<HeapifyHelperFn>& codegend_heapify_helper_fn_;
+};
+
+/// SortedRunWrapper returns individual rows in a batch obtained from a sorted input run
+/// (a RunBatchSupplierFn). Used as the heap element in the min heap maintained by the
+/// merger.
+/// Advance() advances the row supplier to the next row in the input batch and retrieves
+/// the next batch from the input if the current input batch is exhausted. Transfers
+/// ownership from the current input batch to an output batch if requested.
+class SortedRunMerger::SortedRunWrapper {
+ public:
+  /// Construct an instance from a sorted input run.
+  SortedRunWrapper(SortedRunMerger* parent, const RunBatchSupplierFn& sorted_run);
+
+  /// Retrieves the first batch of sorted rows from the run.
+  Status Init(bool* eos);
+
+  /// Increment the current row index. If the current input batch is exhausted, fetch the
+  /// next one from the sorted run. Transfer ownership to transfer_batch if not nullptr.
+  Status Advance(RowBatch* transfer_batch, bool* eos);
+
+  TupleRow* current_row() const {
+    return input_row_batch_->GetRow(input_row_batch_index_);
+  }
+
+ private:
+  friend class SortedRunMerger;
+
+  /// The run from which this object supplies rows.
+  RunBatchSupplierFn sorted_run_;
+
+  /// The current input batch being processed.
+  RowBatch* input_row_batch_;
+
+  /// Index into input_row_batch_ of the current row being processed.
+  int input_row_batch_index_;
+
+  /// The parent merger instance.
+  SortedRunMerger* parent_;
 };
+
 }
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 5cb99d8e5..5f853d50b 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -41,6 +41,7 @@ namespace impala {
 
 // Number of pinned pages required for a merge with fixed-length data only.
 const int MIN_BUFFERS_PER_MERGE = 3;
+const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> Sorter::default_heapify_helper_fn_{};
 
 Status Sorter::Page::Init(Sorter* sorter) {
   const BufferPool::BufferHandle* page_buffer;
@@ -770,6 +771,7 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config,
     int64_t page_len, RuntimeProfile* profile, RuntimeState* state,
     const string& node_label, bool enable_spilling,
     const CodegenFnPtr<SortHelperFn>& codegend_sort_helper_fn,
+    const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn,
     int64_t estimated_input_size)
   : node_label_(node_label),
     state_(state),
@@ -778,6 +780,7 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config,
     compare_less_than_(nullptr),
     in_mem_tuple_sorter_(nullptr),
     codegend_sort_helper_fn_(codegend_sort_helper_fn),
+    codegend_heapify_helper_fn_(codegend_heapify_helper_fn),
     buffer_pool_client_(buffer_pool_client),
     page_len_(page_len),
     has_var_len_slots_(false),
@@ -1155,7 +1158,8 @@ Status Sorter::CreateMerger(int num_runs) {
   // from the runs being merged. This is unnecessary overhead that is not required if we
   // correctly transfer resources.
   merger_.reset(
-      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true));
+      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true,
+          codegend_heapify_helper_fn_));
 
   vector<function<Status (RowBatch**)>> merge_runs;
   merge_runs.reserve(num_runs);
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index a4f5b71a2..6b9785959 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -23,6 +23,7 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/runtime-profile.h"
 #include "util/tuple-row-compare.h"
+#include "runtime/sorted-run-merger.h"
 
 namespace impala {
 
@@ -119,6 +120,8 @@ class Sorter {
       RuntimeProfile* profile, RuntimeState* state, const std::string& node_label,
       bool enable_spilling,
       const CodegenFnPtr<SortHelperFn>& codegend_sort_helper_fn,
+      const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn =
+            default_heapify_helper_fn_,
       int64_t estimated_input_size = -1);
   ~Sorter();
 
@@ -255,9 +258,18 @@ class Sorter {
   boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;
 
   /// A reference to the codegened version of TupleSorter::SortHelper() that is stored
-  /// inside SortPlanNode and PartialSortPlanNode.
+  /// inside SortPlanNode, PartialSortPlanNode and TopNPlanNode.
   const CodegenFnPtr<SortHelperFn>& codegend_sort_helper_fn_;
 
+  /// A reference to the codegened version of SortedRunMerger::HeapifyHelper() that is
+  /// stored inside SortPlanNode and ExchangePlanNode.
+  const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn_;
+
+  /// A default codegened function pointer storing nullptr, which is used when the
+  /// merger is not needed. Used as a default value in constructor, when the CodegenFnPtr
+  /// is not provided.
+  static const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> default_heapify_helper_fn_;
+
   /// Client used to allocate pages from the buffer pool. Not owned.
   BufferPool::ClientHandle* const buffer_pool_client_;