You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:36 UTC

[4/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

The main outcome of this patch is to split out PhjBuilder from
PartitionedHashJoinNode, which manages the build partitions for the
join and implements the DataSink interface.

A lot of this work is fairly mechanical refactoring: dividing the state
between the two classes and duplicating it where appropriate. One major
change is that probe partitions need to be separated from build
partitions.

This required some significant algorithmic changes to memory management
for probe partition buffers: memory management for probe partitions was
entangled with the build partitions: small buffers were allocated for
the probe partitions even for non-spilling joins and the join could
spill additional partitions during probing if the probe partitions needed
to switch from small to I/O buffers. The changes made were:
- Probe partitions are only initialized after the build is partitioned, and
  only for spilled build partitions.
- Probe partitions never use small buffers: once the initial write
  buffer is allocated, appending to the probe partition never fails.
- All probe partition allocation is done after partitioning the build
  and before processing the probe input during the same phase as hash
  table building. (Aside from NAAJ partitions which are allocated
  upfront).

The probe partition changes necessitated a change in
BufferedTupleStream: allocation of write blocks is now explicit via the
PrepareForWrite() API.

Testing:
Ran exhaustive build and local stress test.

Memory Usage:
Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20.
No regressions on TPC-DS. TPC-H either stayed the same or improved in
min memory requirement without spilling, but the min memory requirement
with spilling regressed in some cases. I investigated each of the
significant regressions on TPC-H and determined that they were all due
to exec nodes racing for spillable or non-spillable memory. None of them
were cases where exec nodes got their minimum reservation and failed to
execute the spilling algorithm correctly.

Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb
Reviewed-on: http://gerrit.cloudera.org:8080/3873
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1ccd83b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1ccd83b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1ccd83b2

Branch: refs/heads/master
Commit: 1ccd83b2d0cf60c3b711a5d9bd758438ab846150
Parents: b7d107a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jun 2 17:48:15 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:07:37 2016 +0000

----------------------------------------------------------------------
 .clang-format                                   |    3 +-
 be/src/codegen/gen_ir_descriptions.py           |    4 +-
 be/src/codegen/impala-ir.cc                     |    1 +
 be/src/exec/CMakeLists.txt                      |    2 +
 be/src/exec/analytic-eval-node.cc               |   15 +-
 be/src/exec/blocking-join-node.cc               |    4 +-
 be/src/exec/blocking-join-node.h                |    4 +-
 be/src/exec/data-sink.cc                        |   16 +-
 be/src/exec/hash-join-node.cc                   |    2 +-
 be/src/exec/nested-loop-join-builder.cc         |    3 +-
 be/src/exec/nested-loop-join-builder.h          |   14 +-
 be/src/exec/nested-loop-join-node.cc            |    6 +-
 be/src/exec/partitioned-aggregation-node.cc     |   34 +-
 be/src/exec/partitioned-hash-join-builder-ir.cc |  110 ++
 be/src/exec/partitioned-hash-join-builder.cc    |  900 ++++++++++++
 be/src/exec/partitioned-hash-join-builder.h     |  460 ++++++
 be/src/exec/partitioned-hash-join-node-ir.cc    |  135 +-
 be/src/exec/partitioned-hash-join-node.cc       | 1349 +++++-------------
 be/src/exec/partitioned-hash-join-node.h        |  513 +++----
 be/src/exec/partitioned-hash-join-node.inline.h |    6 -
 be/src/runtime/buffered-block-mgr.cc            |    9 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |   29 +-
 be/src/runtime/buffered-tuple-stream.cc         |   71 +-
 be/src/runtime/buffered-tuple-stream.h          |   24 +-
 be/src/util/runtime-profile.cc                  |   37 +-
 be/src/util/runtime-profile.h                   |   13 +-
 tests/stress/concurrent_select.py               |    6 +-
 27 files changed, 2272 insertions(+), 1498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/.clang-format
----------------------------------------------------------------------
diff --git a/.clang-format b/.clang-format
index 2de4b96..c0b9030 100644
--- a/.clang-format
+++ b/.clang-format
@@ -1,6 +1,7 @@
 BasedOnStyle: Google
 AlignAfterOpenBracket: 'false'
 AlignOperands: 'false'
+AllowShortCaseLabelsOnASingleLine: 'true'
 AllowShortFunctionsOnASingleLine: 'Inline'
 AllowShortIfStatementsOnASingleLine: 'true'
 BreakBeforeBinaryOperators: 'NonAssignment'
@@ -11,4 +12,4 @@ ContinuationIndentWidth: '4'
 DerivePointerAlignment: 'false'
 PenaltyBreakBeforeFirstCallParameter: '99999999'
 SpacesBeforeTrailingComments: '1'
-Standard: 'Cpp11'
\ No newline at end of file
+Standard: 'Cpp11'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index a12d73d..60244a7 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -81,7 +81,7 @@ ir_functions = [
   ["HASH_MURMUR", "IrMurmurHash"],
   ["HASH_JOIN_PROCESS_BUILD_BATCH", "12HashJoinNode17ProcessBuildBatch"],
   ["HASH_JOIN_PROCESS_PROBE_BATCH", "12HashJoinNode17ProcessProbeBatch"],
-  ["PHJ_PROCESS_BUILD_BATCH", "23PartitionedHashJoinNode17ProcessBuildBatch"],
+  ["PHJ_PROCESS_BUILD_BATCH", "10PhjBuilder17ProcessBuildBatch"],
   ["PHJ_PROCESS_PROBE_BATCH_INNER_JOIN", "ProcessProbeBatchILi0"],
   ["PHJ_PROCESS_PROBE_BATCH_LEFT_OUTER_JOIN", "ProcessProbeBatchILi1"],
   ["PHJ_PROCESS_PROBE_BATCH_LEFT_SEMI_JOIN", "ProcessProbeBatchILi2"],
@@ -91,7 +91,7 @@ ir_functions = [
   ["PHJ_PROCESS_PROBE_BATCH_RIGHT_SEMI_JOIN", "ProcessProbeBatchILi6"],
   ["PHJ_PROCESS_PROBE_BATCH_RIGHT_ANTI_JOIN", "ProcessProbeBatchILi7"],
   ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN", "ProcessProbeBatchILi8"],
-  ["PHJ_INSERT_BATCH", "9Partition11InsertBatch"],
+  ["PHJ_INSERT_BATCH", "10PhjBuilder9Partition11InsertBatch"],
   ["HASH_TABLE_GET_HASH_SEED", "GetHashSeed"],
   ["HASH_TABLE_GET_BUILD_EXPR_CTX", "HashTableCtx15GetBuildExprCtx"],
   ["HASH_TABLE_GET_PROBE_EXPR_CTX", "HashTableCtx15GetProbeExprCtx"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 185d6c2..9b14058 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -29,6 +29,7 @@
 #include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
 #include "exec/partitioned-aggregation-node-ir.cc"
+#include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"
 #include "exec/topn-node-ir.cc"
 #include "exprs/aggregate-functions-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 8f12656..25f2202 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -73,6 +73,8 @@ add_library(Exec
   parquet-metadata-utils.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc
+  partitioned-hash-join-builder.cc
+  partitioned-hash-join-builder-ir.cc
   partitioned-hash-join-node.cc
   partitioned-hash-join-node-ir.cc
   kudu-scanner.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index c9e35d8..21a0805 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -32,11 +32,6 @@
 
 static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
 
-const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
-    "Failed to acquire initial read buffer for analytic function evaluation. Reducing "
-    "query concurrency or increasing the memory limit may help this query to complete "
-    "successfully.";
-
 using namespace strings;
 
 namespace impala {
@@ -200,10 +195,15 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
       state->block_mgr(), client_, false /* use_initial_small_buffers */,
       true /* read_write */);
   RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
+  bool got_write_buffer;
+  RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer));
+  if (!got_write_buffer) {
+    return state->block_mgr()->MemLimitTooLowError(client_, id());
+  }
   bool got_read_buffer;
   RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    return mem_tracker()->MemLimitExceeded(state, PREPARE_FOR_READ_FAILED_ERROR_MSG);
+    return state->block_mgr()->MemLimitTooLowError(client_, id());
   }
 
   DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
@@ -367,7 +367,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     // the stream and continue writing/reading in unpinned mode.
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
-    RETURN_IF_ERROR(input_stream_->UnpinStream());
+    RETURN_IF_ERROR(
+        input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 309bde4..b114451 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -174,8 +174,8 @@ Status BlockingJoinNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state,
-    DataSink* build_sink) {
+Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
+    RuntimeState* state, DataSink* build_sink) {
   // If this node is not inside a subplan and can get a thread token, initiate the
   // construction of the build-side table in a separate thread, so that the left child
   // can do any initialisation in parallel. Otherwise, do this in the main thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 14b1c72..4000b22 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -104,7 +104,7 @@ class BlockingJoinNode : public ExecNode {
   MonotonicStopWatch built_probe_overlap_stop_watch_;
 
   /// Processes the build-side input.
-  /// Called from ConstructBuildAndOpenProbe() if the subclass does not provide a
+  /// Called from ProcessBuildInputAndOpenProbe() if the subclass does not provide a
   /// DataSink to consume the build input.
   /// Note that this can be called concurrently with Open'ing the left child to
   /// increase parallelism. If, for example, the left child is another join node,
@@ -116,7 +116,7 @@ class BlockingJoinNode : public ExecNode {
   /// if the plan shape and thread token availability permit it.
   /// If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'. Otherwise
   /// calls ProcessBuildInput on the subclass.
-  Status ConstructBuildAndOpenProbe(RuntimeState* state, DataSink* build_sink);
+  Status ProcessBuildInputAndOpenProbe(RuntimeState* state, DataSink* build_sink);
 
   /// Helper function to process the build input by sending it to a DataSink.
   /// ASYNC_BUILD enables timers that impose some overhead but are required if the build

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 8c8b2dc..b6ec0ee 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -22,25 +22,30 @@
 
 #include "common/logging.h"
 #include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
 #include "exec/hbase-table-sink.h"
+#include "exec/hdfs-table-sink.h"
 #include "exec/kudu-table-sink.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/strings/substitute.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
 #include "common/names.h"
 
+using strings::Substitute;
+
 namespace impala {
 
 DataSink::DataSink(const RowDescriptor& row_desc) :
     closed_(false), row_desc_(row_desc), mem_tracker_(NULL) {}
 
-DataSink::~DataSink() {}
+DataSink::~DataSink() {
+  DCHECK(closed_);
+}
 
 Status DataSink::CreateDataSink(ObjectPool* pool,
     const TDataSink& thrift_sink, const vector<TExpr>& output_exprs,
@@ -153,15 +158,18 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
   DCHECK(mem_tracker != NULL);
   profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
   mem_tracker_ = mem_tracker;
-  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker, false));
+  expr_mem_tracker_.reset(
+      new MemTracker(-1, Substitute("$0 Exprs", GetName()), mem_tracker, false));
   return Status::OK();
 }
 
 void DataSink::Close(RuntimeState* state) {
+  if (closed_) return;
   if (expr_mem_tracker_ != NULL) {
     expr_mem_tracker_->UnregisterFromParent();
     expr_mem_tracker_.reset();
   }
+  closed_ = true;
 }
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 52ba1d1..9106159 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -205,7 +205,7 @@ Status HashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
-  RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, NULL));
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   InitGetNext();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 4315b93..58610e7 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -32,8 +32,7 @@ NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state,
     : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size(),
       mem_tracker) {}
 
-Status NljBuilder::Prepare(RuntimeState* state,
-    MemTracker* mem_tracker) {
+Status NljBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index a64315a..3fb5348 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -39,13 +39,13 @@ class NljBuilder : public DataSink {
  public:
   NljBuilder(const RowDescriptor& row_desc, RuntimeState* state, MemTracker* mem_tracker);
 
-  /// Implementations of DataSink methods.
-  virtual std::string GetName() { return "Nested Loop Join Build"; }
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
-  virtual Status Open(RuntimeState* state);
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
-  virtual Status FlushFinal(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
+  /// Implementations of DataSink interface methods.
+  virtual std::string GetName() override { return "Nested Loop Join Builder"; }
+  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status Send(RuntimeState* state, RowBatch* batch) override;
+  virtual Status FlushFinal(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
 
   /// Reset the builder to the same state as it was in after calling Open().
   void Reset();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 87fbf62..0a115c6 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -81,7 +81,7 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
     build_batches_ = builder_->input_build_batches();
   } else {
     RETURN_IF_ERROR(
-        BlockingJoinNode::ConstructBuildAndOpenProbe(state, builder_.get()));
+        BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
     build_batches_ = builder_->GetFinalBuildBatches();
     if (matching_build_rows_ != NULL) {
       RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
@@ -102,9 +102,9 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Prepare(
       join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
 
-  builder_.reset(
-      new NljBuilder(child(1)->row_desc(), state, mem_tracker()));
+  builder_.reset(new NljBuilder(child(1)->row_desc(), state, mem_tracker()));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+  runtime_profile()->PrependChild(builder_->profile());
 
   // For some join modes we need to record the build rows with matches in a bitmap.
   if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::RIGHT_SEMI_JOIN ||

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index ba2d9f7..f926725 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -285,6 +285,11 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
         state->block_mgr(), block_mgr_client_, false /* use_initial_small_buffers */,
         false /* read_write */));
     RETURN_IF_ERROR(serialize_stream_->Init(id(), runtime_profile(), false));
+    bool got_buffer;
+    RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
+    if (!got_buffer) {
+      return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
+    }
     DCHECK(serialize_stream_->has_write_block());
   }
 
@@ -755,6 +760,12 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
       false /* read_write */, external_varlen_slots));
   RETURN_IF_ERROR(
       aggregated_row_stream->Init(parent->id(), parent->runtime_profile(), true));
+  bool got_buffer;
+  RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return parent->state_->block_mgr()->MemLimitTooLowError(
+        parent->block_mgr_client_, parent->id());
+  }
 
   if (!parent->is_streaming_preagg_) {
     unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
@@ -764,6 +775,12 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
     // This stream is only used to spill, no need to ever have this pinned.
     RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), parent->runtime_profile(),
         false));
+    // TODO: allocate this buffer later only if we spill the partition.
+    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    if (!got_buffer) {
+      return parent->state_->block_mgr()->MemLimitTooLowError(
+          parent->block_mgr_client_, parent->id());
+    }
     DCHECK(unaggregated_row_stream->has_write_block());
   }
   return Status::OK();
@@ -844,6 +861,14 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
         false /* read_write */));
     status = parent->serialize_stream_->Init(parent->id(), parent->runtime_profile(),
         false);
+    if (status.ok()) {
+      bool got_buffer;
+      status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
+      if (status.ok() && !got_buffer) {
+        status = parent->state_->block_mgr()->MemLimitTooLowError(
+            parent->block_mgr_client_, parent->id());
+      }
+    }
     if (!status.ok()) {
       hash_tbl->Close();
       hash_tbl.reset();
@@ -887,7 +912,8 @@ Status PartitionedAggregationNode::Partition::Spill() {
   // TODO: when not repartitioning, don't leave the write block pinned.
   DCHECK(!got_buffer || aggregated_row_stream->has_write_block())
       << aggregated_row_stream->DebugString();
-  RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(false));
+  RETURN_IF_ERROR(
+      aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   if (got_buffer && unaggregated_row_stream->using_small_buffers()) {
     RETURN_IF_ERROR(unaggregated_row_stream->SwitchToIoBuffers(&got_buffer));
@@ -1372,8 +1398,10 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
       // TODO: we only need to do this when we have memory pressure. This might be
       // okay though since the block mgr should only write these to disk if there
       // is memory pressure.
-      RETURN_IF_ERROR(partition->aggregated_row_stream->UnpinStream(true));
-      RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(true));
+      RETURN_IF_ERROR(
+          partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(
+          BufferedTupleStream::UNPIN_ALL));
 
       // Push new created partitions at the front. This means a depth first walk
       // (more finely partitioned partitions are processed first). This allows us

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
new file mode 100644
index 0000000..21fd9e4
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned-hash-join-builder.h"
+
+#include "codegen/impala-ir.h"
+#include "exec/hash-table.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-filter.h"
+#include "util/bloom-filter.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* row) {
+  Status status;
+  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  RETURN_IF_ERROR(status);
+  return AppendRowStreamFull(stream, row);
+}
+
+Status PhjBuilder::ProcessBuildBatch(
+    RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache();
+  expr_vals_cache->Reset();
+  FOREACH_ROW(build_batch, 0, build_batch_iter) {
+    TupleRow* build_row = build_batch_iter.Get();
+    if (!ctx->EvalAndHashBuild(build_row)) {
+      if (null_aware_partition_ != NULL) {
+        // TODO: remove with codegen/template
+        // If we are NULL aware and this build row has NULL in the eq join slot,
+        // append it to the null_aware partition. We will need it later.
+        RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), build_row));
+      }
+      continue;
+    }
+    if (build_filters) {
+      DCHECK_EQ(ctx->level(), 0)
+          << "Runtime filters should not be built during repartitioning.";
+      for (const FilterContext& ctx : filters_) {
+        // TODO: codegen expr evaluation and hashing
+        if (ctx.local_bloom_filter == NULL) continue;
+        void* e = ctx.expr->GetValue(build_row);
+        uint32_t filter_hash = RawValue::GetHashValue(
+            e, ctx.expr->root()->type(), RuntimeFilterBank::DefaultHashSeed());
+        ctx.local_bloom_filter->Insert(filter_hash);
+      }
+    }
+    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    Partition* partition = hash_partitions_[partition_idx];
+    RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row));
+  }
+  return Status::OK();
+}
+
+bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
+    HashTableCtx* ht_ctx, RowBatch* batch,
+    const vector<BufferedTupleStream::RowIdx>& indices) {
+  // Compute the hash values and prefetch the hash table buckets.
+  const int num_rows = batch->num_rows();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  const BufferedTupleStream::RowIdx* row_indices = indices.data();
+  for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
+       prefetch_group_row += prefetch_size) {
+    int cur_row = prefetch_group_row;
+    expr_vals_cache->Reset();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
+        if (prefetch_mode != TPrefetchMode::NONE) {
+          hash_tbl_->PrefetchBucket<false>(expr_vals_cache->CurExprValuesHash());
+        }
+      } else {
+        expr_vals_cache->SetRowNull();
+      }
+      expr_vals_cache->NextRow();
+    }
+    // Do the insertion.
+    expr_vals_cache->ResetForRead();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      TupleRow* row = batch_iter.Get();
+      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+      if (!expr_vals_cache->IsRowNull()
+          && UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
+        return false;
+      }
+      expr_vals_cache->NextRow();
+      ++cur_row;
+    }
+  }
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
new file mode 100644
index 0000000..b17bbff
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -0,0 +1,900 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned-hash-join-builder.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/hash-table.inline.h"
+#include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-state.h"
+#include "util/bloom-filter.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read "
+    "buffer for stream in hash join node $0. Reducing query concurrency or increasing "
+    "the memory limit may help this query to complete successfully.";
+
+using namespace impala;
+using namespace llvm;
+using namespace strings;
+using std::unique_ptr;
+
+PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
+    const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc,
+    RuntimeState* state)
+  : DataSink(build_row_desc),
+    runtime_state_(state),
+    join_node_id_(join_node_id),
+    join_op_(join_op),
+    probe_row_desc_(probe_row_desc),
+    block_mgr_client_(NULL),
+    non_empty_build_(false),
+    partitions_created_(NULL),
+    largest_partition_percent_(NULL),
+    max_partition_level_(NULL),
+    num_build_rows_partitioned_(NULL),
+    num_hash_collisions_(NULL),
+    num_hash_buckets_(NULL),
+    num_spilled_partitions_(NULL),
+    num_repartitions_(NULL),
+    partition_build_rows_timer_(NULL),
+    build_hash_table_timer_(NULL),
+    repartition_timer_(NULL),
+    null_aware_partition_(NULL),
+    process_build_batch_fn_(NULL),
+    process_build_batch_fn_level0_(NULL),
+    insert_batch_fn_(NULL),
+    insert_batch_fn_level0_(NULL) {}
+
+Status PhjBuilder::Init(RuntimeState* state,
+    const vector<TEqJoinCondition>& eq_join_conjuncts,
+    const vector<TRuntimeFilterDesc>& filters) {
+  for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
+    ExprContext* ctx;
+    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, &ctx));
+    build_expr_ctxs_.push_back(ctx);
+    is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
+  }
+
+  for (const TRuntimeFilterDesc& filter : filters) {
+    // If filter propagation not enabled, only consider building broadcast joins (that may
+    // be consumed by this fragment).
+    if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL
+        && !filter.is_broadcast_join) {
+      continue;
+    }
+    if (state->query_options().disable_row_runtime_filtering
+        && !filter.applied_on_partition_columns) {
+      continue;
+    }
+    FilterContext filter_ctx;
+    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
+    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, &filter_ctx.expr));
+    filters_.push_back(filter_ctx);
+  }
+  return Status::OK();
+}
+
+string PhjBuilder::GetName() {
+  return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
+}
+
+Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+  RETURN_IF_ERROR(
+      Expr::Prepare(build_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get()));
+  expr_ctxs_to_free_.insert(
+      expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), build_expr_ctxs_.end());
+
+  for (const FilterContext& ctx : filters_) {
+    RETURN_IF_ERROR(ctx.expr->Prepare(state, row_desc_, expr_mem_tracker_.get()));
+    expr_ctxs_to_free_.push_back(ctx.expr);
+  }
+  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, build_expr_ctxs_,
+      HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(),
+      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_, &ht_ctx_));
+  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
+      Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
+      MinRequiredBuffers(), true, mem_tracker, state, &block_mgr_client_));
+
+  partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
+  largest_partition_percent_ =
+      profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
+  max_partition_level_ =
+      profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT);
+  num_build_rows_partitioned_ =
+      ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT);
+  num_hash_collisions_ = ADD_COUNTER(profile(), "HashCollisions", TUnit::UNIT);
+  num_hash_buckets_ = ADD_COUNTER(profile(), "HashBuckets", TUnit::UNIT);
+  num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT);
+  num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT);
+  partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
+  build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
+  repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
+
+  Codegen(state);
+  return Status::OK();
+}
+
+Status PhjBuilder::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
+  for (const FilterContext& filter : filters_) RETURN_IF_ERROR(filter.expr->Open(state));
+  RETURN_IF_ERROR(CreateHashPartitions(0));
+  AllocateRuntimeFilters();
+
+  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_));
+  }
+  return Status::OK();
+}
+
+Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(partition_build_rows_timer_);
+  bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0;
+  if (process_build_batch_fn_ == NULL) {
+    RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters));
+  } else {
+    DCHECK(process_build_batch_fn_level0_ != NULL);
+    if (ht_ctx_->level() == 0) {
+      RETURN_IF_ERROR(
+          process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters));
+    } else {
+      RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters));
+    }
+  }
+
+  // Free any local allocations made during partitioning.
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
+  return Status::OK();
+}
+
+Status PhjBuilder::FlushFinal(RuntimeState* state) {
+  int64_t num_build_rows = 0;
+  for (Partition* partition : hash_partitions_) {
+    num_build_rows += partition->build_rows()->num_rows();
+  }
+
+  if (num_build_rows > 0) {
+    double largest_fraction = 0.0;
+    for (Partition* partition : hash_partitions_) {
+      largest_fraction = max(largest_fraction,
+          partition->build_rows()->num_rows() / static_cast<double>(num_build_rows));
+    }
+    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100));
+  }
+
+  if (VLOG_IS_ON(2)) {
+    stringstream ss;
+    ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_,
+        hash_partitions_[0]->level(), num_build_rows);
+    for (int i = 0; i < hash_partitions_.size(); ++i) {
+      Partition* partition = hash_partitions_[i];
+      double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows()
+              * 100 / static_cast<double>(num_build_rows);
+      ss << "  " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
+         << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
+         << "    #rows:" << partition->build_rows()->num_rows() << endl;
+    }
+    VLOG(2) << ss.str();
+  }
+
+  if (ht_ctx_->level() == 0) {
+    PublishRuntimeFilters(num_build_rows);
+    non_empty_build_ |= (num_build_rows > 0);
+  }
+
+  RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams());
+  return Status::OK();
+}
+
+void PhjBuilder::Close(RuntimeState* state) {
+  if (closed_) return;
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  CloseAndDeletePartitions();
+  if (ht_ctx_ != NULL) ht_ctx_->Close();
+  Expr::Close(build_expr_ctxs_, state);
+  for (const FilterContext& ctx : filters_) ctx.expr->Close(state);
+  if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_);
+  pool_.Clear();
+  DataSink::Close(state);
+  closed_ = true;
+}
+
+void PhjBuilder::Reset() {
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  non_empty_build_ = false;
+  CloseAndDeletePartitions();
+}
+
+Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
+  all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
+  *partition = all_partitions_.back().get();
+  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), true));
+  bool got_buffer;
+  RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return runtime_state_->block_mgr()->MemLimitTooLowError(
+        block_mgr_client_, join_node_id_);
+  }
+  return Status::OK();
+}
+
+Status PhjBuilder::CreateHashPartitions(int level) {
+  DCHECK(hash_partitions_.empty());
+  ht_ctx_->set_level(level); // Set the hash function for partitioning input.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* new_partition;
+    RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition));
+    hash_partitions_.push_back(new_partition);
+  }
+  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
+  COUNTER_SET(max_partition_level_, level);
+  return Status::OK();
+}
+
+Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row) {
+  Status status;
+  while (true) {
+    // Check if the stream is still using small buffers and try to switch to IO-buffers.
+    if (stream->using_small_buffers()) {
+      bool got_buffer;
+      RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
+      if (got_buffer) {
+        if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+        RETURN_IF_ERROR(status);
+      }
+    }
+    // We ran out of memory. Pick a partition to spill. If we ran out of unspilled
+    // partitions, SpillPartition() will return an error status.
+    RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    if (stream->AddRow(row, &status)) return Status::OK();
+    RETURN_IF_ERROR(status);
+    // Spilling one partition does not guarantee we can append a row. Keep
+    // spilling until we can append this row.
+  }
+}
+
+// TODO: can we do better with a different spilling heuristic?
+Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
+  DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
+  int64_t max_freed_mem = 0;
+  int partition_idx = -1;
+
+  // Iterate over the partitions and pick the largest partition to spill.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* candidate = hash_partitions_[i];
+    if (candidate->IsClosed()) continue;
+    if (candidate->is_spilled()) continue;
+    int64_t mem = candidate->build_rows()->bytes_in_mem(false);
+    if (candidate->hash_tbl() != NULL) {
+      // The hash table should not have matches, since we have not probed it yet.
+      // Losing match info would lead to incorrect results (IMPALA-1488).
+      DCHECK(!candidate->hash_tbl()->HasMatches());
+      mem += candidate->hash_tbl()->ByteSize();
+    }
+    if (mem > max_freed_mem) {
+      max_freed_mem = mem;
+      partition_idx = i;
+    }
+  }
+
+  if (partition_idx == -1) {
+    // Could not find a partition to spill. This means the mem limit was just too low.
+    return runtime_state_->block_mgr()->MemLimitTooLowError(
+        block_mgr_client_, join_node_id_);
+  }
+
+  VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString();
+  Partition* build_partition = hash_partitions_[partition_idx];
+  RETURN_IF_ERROR(build_partition->Spill(mode));
+  return Status::OK();
+}
+
+// When this function is called, we've finished processing the current build input
+// (either from the child ExecNode or from repartitioning a spilled partition). The build
+// rows have only been partitioned, we still need to build hash tables over them. Some
+// of the partitions could have already been spilled and attempting to build hash
+// tables over the non-spilled ones can cause them to spill.
+//
+// At the end of the function all partitions either have a hash table (and therefore are
+// not spilled) or are spilled. Partitions that have hash tables don't need to spill on
+// the probe side.
+//
+// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
+// build rows and hash table and the value is the expected IO savings.
+// For now, we go with a greedy solution.
+//
+// TODO: implement the knapsack solution.
+Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
+  DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
+
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition->build_rows()->num_rows() == 0) {
+      // This partition is empty, no need to do anything else.
+      partition->Close(NULL);
+    } else if (partition->is_spilled()) {
+      // We don't need any build-side data for spilled partitions in memory.
+      RETURN_IF_ERROR(
+          partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+    }
+  }
+
+  // Allocate probe buffers for all partitions that are already spilled. Do this before
+  // building hash tables because allocating probe buffers may cause some more partitions
+  // to be spilled. This avoids wasted work on building hash tables for partitions that
+  // won't fit in memory alongside the required probe buffers.
+  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
+
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition->IsClosed() || partition->is_spilled()) continue;
+
+    bool built = false;
+    DCHECK(partition->build_rows()->is_pinned());
+    RETURN_IF_ERROR(partition->BuildHashTable(&built));
+    // If we did not have enough memory to build this hash table, we need to spill this
+    // partition (clean up the hash table, unpin build).
+    if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
+  }
+
+  // We may have spilled additional partitions while building hash tables, we need to
+  // initialize probe buffers for them.
+  // TODO: once we have reliable reservations (IMPALA-3200) this should no longer be
+  // necessary: we will know exactly how many partitions will fit in memory and we can
+  // avoid building then immediately destroying hash tables.
+  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
+
+  // TODO: at this point we could have freed enough memory to pin and build some
+  // spilled partitions. This can happen, for example if there is a lot of skew.
+  // Partition 1: 10GB (pinned initially).
+  // Partition 2,3,4: 1GB (spilled during partitioning the build).
+  // In the previous step, we could have unpinned 10GB (because there was not enough
+  // memory to build a hash table over it) which can now free enough memory to
+  // build hash tables over the remaining 3 partitions.
+  // We start by spilling the largest partition though so the build input would have
+  // to be pretty pathological.
+  // Investigate if this is worthwhile.
+  return Status::OK();
+}
+
+Status PhjBuilder::InitSpilledPartitionProbeStreams() {
+  DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
+
+  int num_spilled_partitions = 0;
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions;
+  }
+  int probe_streams_to_create =
+      num_spilled_partitions - spilled_partition_probe_streams_.size();
+
+  while (probe_streams_to_create > 0) {
+    // Create stream in vector, so that it will be cleaned up after any failure.
+    spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>(
+        runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), block_mgr_client_,
+        false /* use_initial_small_buffers */, false /* read_write */));
+    BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
+    RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false));
+
+    // Loop until either the stream gets a buffer or all partitions are spilled (in which
+    // case SpillPartition() returns an error).
+    while (true) {
+      bool got_buffer;
+      RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
+      if (got_buffer) break;
+
+      RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
+      ++probe_streams_to_create;
+    }
+    --probe_streams_to_create;
+  }
+  return Status::OK();
+}
+
+vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
+  return std::move(spilled_partition_probe_streams_);
+}
+
+void PhjBuilder::CloseAndDeletePartitions() {
+  // Close all the partitions and clean up all references to them.
+  for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(NULL);
+  all_partitions_.clear();
+  hash_partitions_.clear();
+  null_aware_partition_ = NULL;
+  for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
+    stream->Close();
+  }
+  spilled_partition_probe_streams_.clear();
+}
+
+void PhjBuilder::AllocateRuntimeFilters() {
+  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
+      << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
+  DCHECK(ht_ctx_ != NULL);
+  for (int i = 0; i < filters_.size(); ++i) {
+    filters_[i].local_bloom_filter =
+        runtime_state_->filter_bank()->AllocateScratchBloomFilter(
+            filters_[i].filter->id());
+  }
+}
+
+void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
+  int32_t num_enabled_filters = 0;
+  // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish
+  // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
+  // serialisation time, and reduces the cost of applying the filter at the scan - most
+  // significantly for per-row filters. However, the number of build rows could be a very
+  // poor estimate of the NDV - particularly if the filter expression is a function of
+  // several columns.
+  // TODO: Better heuristic.
+  for (const FilterContext& ctx : filters_) {
+    // TODO: Consider checking actual number of bits set in filter to compute FP rate.
+    // TODO: Consider checking this every few batches or so.
+    bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
+        ctx.filter->filter_size(), num_build_rows);
+    runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
+        fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
+
+    num_enabled_filters += !fp_rate_too_high;
+  }
+
+  if (filters_.size() > 0) {
+    string info_string;
+    if (num_enabled_filters == filters_.size()) {
+      info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
+          filters_.size() == 1 ? "" : "s");
+    } else {
+      info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
+          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
+          filters_.size() - num_enabled_filters);
+    }
+    profile()->AddInfoString("Runtime filters", info_string);
+  }
+}
+
+Status PhjBuilder::RepartitionBuildInput(
+    Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) {
+  DCHECK_GE(level, 1);
+  SCOPED_TIMER(repartition_timer_);
+  COUNTER_ADD(num_repartitions_, 1);
+  RuntimeState* state = runtime_state_;
+
+  // Setup the read buffer and the new partitions.
+  BufferedTupleStream* build_rows = input_partition->build_rows();
+  DCHECK(build_rows != NULL);
+  bool got_read_buffer;
+  RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    return mem_tracker()->MemLimitExceeded(
+        state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_));
+  }
+  RETURN_IF_ERROR(CreateHashPartitions(level));
+
+  // Repartition 'input_stream' into 'hash_partitions_'.
+  RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker());
+  bool eos = false;
+  while (!eos) {
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(state->CheckQueryState());
+
+    RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos));
+    RETURN_IF_ERROR(Send(state, &build_batch));
+    build_batch.Reset();
+  }
+
+  // Done reading the input, we can safely close it now to free memory.
+  input_partition->Close(NULL);
+
+  // We just freed up the buffer used for reading build rows. Ensure a buffer is
+  // allocated for reading probe rows before we build the hash tables in FlushFinal().
+  // TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the
+  // reservation and avoid this complication.
+  while (true) {
+    bool got_buffer;
+    input_probe_rows->PrepareForRead(true, &got_buffer);
+    if (got_buffer) break;
+    RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+  }
+
+  RETURN_IF_ERROR(FlushFinal(state));
+  return Status::OK();
+}
+
+int64_t PhjBuilder::LargestPartitionRows() const {
+  int64_t max_rows = 0;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    DCHECK(partition != NULL);
+    if (partition->IsClosed()) continue;
+    int64_t rows = partition->build_rows()->num_rows();
+    if (rows > max_rows) max_rows = rows;
+  }
+  return max_rows;
+}
+
+bool PhjBuilder::HashTableStoresNulls() const {
+  return join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN
+      || join_op_ == TJoinOp::FULL_OUTER_JOIN
+      || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(),
+             false, std::logical_or<bool>());
+}
+
+PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
+  : parent_(parent), is_spilled_(false), level_(level) {
+  // If we're repartitioning, we can assume the build input is fairly large and small
+  // buffers will most likely just waste memory.
+  bool use_initial_small_buffers = level == 0;
+  build_rows_ =
+      std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, state->block_mgr(),
+          parent_->block_mgr_client_, use_initial_small_buffers, false /* read_write */);
+}
+
+PhjBuilder::Partition::~Partition() {
+  DCHECK(IsClosed());
+}
+
+int64_t PhjBuilder::Partition::EstimatedInMemSize() const {
+  return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
+}
+
+void PhjBuilder::Partition::Close(RowBatch* batch) {
+  if (IsClosed()) return;
+
+  if (hash_tbl_ != NULL) {
+    COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
+    hash_tbl_->Close();
+  }
+
+  // Transfer ownership of build_rows_ to batch if batch is not NULL.
+  // Otherwise, close the stream here.
+  if (build_rows_ != NULL) {
+    if (batch == NULL) {
+      build_rows_->Close();
+    } else {
+      batch->AddTupleStream(build_rows_.release());
+    }
+    build_rows_.reset();
+  }
+}
+
+Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
+  DCHECK(!IsClosed());
+  // Close the hash table as soon as possible to release memory.
+  if (hash_tbl() != NULL) {
+    hash_tbl_->Close();
+    hash_tbl_.reset();
+  }
+
+  // Unpin the stream as soon as possible to increase the chances that the
+  // SwitchToIoBuffers() call below will succeed.
+  RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
+
+  if (build_rows_->using_small_buffers()) {
+    bool got_buffer;
+    RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
+    if (!got_buffer) {
+      // We'll try again to get the buffers when the stream fills up the small buffers.
+      VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
+                 << this << " of join=" << parent_->join_node_id_
+                 << " build small buffers=" << build_rows_->using_small_buffers();
+      VLOG_FILE << GetStackTrace();
+    }
+  }
+
+  if (!is_spilled_) {
+    COUNTER_ADD(parent_->num_spilled_partitions_, 1);
+    if (parent_->num_spilled_partitions_->value() == 1) {
+      parent_->profile()->AppendExecOption("Spilled");
+    }
+  }
+  is_spilled_ = true;
+  return Status::OK();
+}
+
+Status PhjBuilder::Partition::BuildHashTable(bool* built) {
+  SCOPED_TIMER(parent_->build_hash_table_timer_);
+  DCHECK(build_rows_ != NULL);
+  *built = false;
+
+  // Before building the hash table, we need to pin the rows in memory.
+  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
+  if (!*built) return Status::OK();
+
+  RuntimeState* state = parent_->runtime_state_;
+  HashTableCtx* ctx = parent_->ht_ctx_.get();
+  ctx->set_level(level()); // Set the hash function for building the hash table.
+  RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker());
+  vector<BufferedTupleStream::RowIdx> indices;
+  bool eos = false;
+
+  // Allocate the partition-local hash table. Initialize the number of buckets based on
+  // the number of build rows (the number of rows is known at this point). This assumes
+  // there are no duplicates which can be wrong. However, the upside in the common case
+  // (few/no duplicates) is large and the downside when there are is low (a bit more
+  // memory; the bucket memory is small compared to the memory needed for all the build
+  // side allocations).
+  // One corner case is if the stream contains tuples with zero footprint (no materialized
+  // slots). If the tuples occupy no space, this implies all rows will be duplicates, so
+  // create a small hash table, IMPALA-2256.
+  //
+  // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly
+  // reading all of the spilled rows from disk when we won't succeed anyway.
+  int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
+      HashTable::EstimateNumBuckets(build_rows()->num_rows()) :
+      state->batch_size() * 2;
+  hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
+      true /* store_duplicates */, parent_->row_desc_.tuple_descriptors().size(),
+      build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
+  if (!hash_tbl_->Init()) goto not_built;
+
+  bool got_read_buffer;
+  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
+  DCHECK(got_read_buffer) << "Stream was already pinned.";
+  do {
+    RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
+    DCHECK_EQ(batch.num_rows(), indices.size());
+    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
+        << build_rows()->RowConsumesMemory();
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+    if (parent_->insert_batch_fn_ != NULL) {
+      InsertBatchFn insert_batch_fn;
+      if (level() == 0) {
+        insert_batch_fn = parent_->insert_batch_fn_level0_;
+      } else {
+        insert_batch_fn = parent_->insert_batch_fn_;
+      }
+      DCHECK(insert_batch_fn != NULL);
+      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
+    } else {
+      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto not_built;
+    }
+    RETURN_IF_ERROR(state->GetQueryStatus());
+    // Free any local allocations made while inserting.
+    ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_);
+    batch.Reset();
+  } while (!eos);
+
+  // The hash table fits in memory and is built.
+  DCHECK(*built);
+  DCHECK(hash_tbl_ != NULL);
+  is_spilled_ = false;
+  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
+  return Status::OK();
+
+not_built:
+  *built = false;
+  if (hash_tbl_ != NULL) {
+    hash_tbl_->Close();
+    hash_tbl_.reset();
+  }
+  return Status::OK();
+}
+
+void PhjBuilder::Codegen(RuntimeState* state) {
+  bool build_codegen_enabled = false;
+  bool insert_codegen_enabled = false;
+  Status build_codegen_status, insert_codegen_status;
+  if (state->codegen_enabled()) {
+    Status codegen_status;
+    // Codegen for hashing rows with the builder's hash table context.
+    Function* hash_fn;
+    codegen_status = ht_ctx_->CodegenHashRow(runtime_state_, false, &hash_fn);
+    Function* murmur_hash_fn;
+    codegen_status.MergeStatus(
+        ht_ctx_->CodegenHashRow(runtime_state_, true, &murmur_hash_fn));
+
+    // Codegen for evaluating build rows
+    Function* eval_build_row_fn;
+    codegen_status.MergeStatus(
+        ht_ctx_->CodegenEvalRow(runtime_state_, true, &eval_build_row_fn));
+
+    if (codegen_status.ok()) {
+      build_codegen_status =
+          CodegenProcessBuildBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
+      insert_codegen_status =
+          CodegenInsertBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
+    } else {
+      build_codegen_status = codegen_status;
+      insert_codegen_status = codegen_status;
+    }
+    build_codegen_enabled = build_codegen_status.ok();
+    insert_codegen_enabled = insert_codegen_status.ok();
+  }
+  profile()->AddCodegenMsg(build_codegen_enabled, build_codegen_status, "Build Side");
+  profile()->AddCodegenMsg(
+      insert_codegen_enabled, insert_codegen_status, "Hash Table Construction");
+}
+
+string PhjBuilder::DebugString() const {
+  stringstream ss;
+  ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    ss << " Hash partition " << i << " ptr=" << partition;
+    if (partition->IsClosed()) {
+      ss << " Closed";
+      continue;
+    }
+    if (partition->is_spilled()) {
+      ss << " Spilled";
+    }
+    DCHECK(partition->build_rows() != NULL);
+    ss << endl
+       << "    Build Rows: " << partition->build_rows()->num_rows()
+       << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")" << endl;
+    if (partition->hash_tbl() != NULL) {
+      ss << "    Hash Table Rows: " << partition->hash_tbl()->size() << endl;
+    }
+  }
+  return ss.str();
+}
+
+Status PhjBuilder::CodegenProcessBuildBatch(
+    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
+  LlvmCodeGen* codegen;
+  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
+
+  Function* process_build_batch_fn =
+      codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
+  DCHECK(process_build_batch_fn != NULL);
+
+  // Replace call sites
+  int replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Replace some hash table parameters with constants.
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = true;
+  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+      num_build_tuples, process_build_batch_fn, &replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
+  DCHECK_EQ(replaced_constants.stores_duplicates, 0);
+  DCHECK_EQ(replaced_constants.stores_tuples, 0);
+  DCHECK_EQ(replaced_constants.quadratic_probing, 0);
+
+  Function* process_build_batch_fn_level0 =
+      codegen->CloneFunction(process_build_batch_fn);
+
+  // Always build runtime filters at level0 (if there are any).
+  // Note that the first argument of this function is the return value.
+  Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4);
+  build_filters_l0_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
+
+  // process_build_batch_fn_level0 uses CRC hash if available,
+  replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  // process_build_batch_fn uses murmur
+  replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Never build filters after repartitioning, as all rows have already been added to the
+  // filters during the level0 build. Note that the first argument of this function is the
+  // return value.
+  Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4);
+  build_filters_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
+
+  // Finalize ProcessBuildBatch functions
+  process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
+  if (process_build_batch_fn == NULL) {
+    return Status(
+        "Codegen'd PhjBuilder::ProcessBuildBatch() function "
+        "failed verification, see log");
+  }
+  process_build_batch_fn_level0 =
+      codegen->FinalizeFunction(process_build_batch_fn_level0);
+  if (process_build_batch_fn == NULL) {
+    return Status(
+        "Codegen'd level-zero PhjBuilder::ProcessBuildBatch() "
+        "function failed verification, see log");
+  }
+
+  // Register native function pointers
+  codegen->AddFunctionToJit(
+      process_build_batch_fn, reinterpret_cast<void**>(&process_build_batch_fn_));
+  codegen->AddFunctionToJit(process_build_batch_fn_level0,
+      reinterpret_cast<void**>(&process_build_batch_fn_level0_));
+  return Status::OK();
+}
+
+Status PhjBuilder::CodegenInsertBatch(
+    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
+  LlvmCodeGen* codegen;
+  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
+
+  Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
+  Function* build_equals_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(runtime_state_, true, &build_equals_fn));
+
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
+  // Use codegen'd EvalBuildRow() function
+  int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Use codegen'd Equals() function
+  replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
+  DCHECK_EQ(replaced, 1);
+
+  // Replace hash-table parameters with constants.
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = true;
+  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+      num_build_tuples, insert_batch_fn, &replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
+  DCHECK_GE(replaced_constants.stores_duplicates, 1);
+  DCHECK_GE(replaced_constants.stores_tuples, 1);
+  DCHECK_GE(replaced_constants.quadratic_probing, 1);
+
+  Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
+
+  // Use codegen'd hash functions
+  replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+  replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
+  if (insert_batch_fn == NULL) {
+    return Status(
+        "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
+        "InsertBatch() function failed verification, see log");
+  }
+  insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
+  if (insert_batch_fn_level0 == NULL) {
+    return Status(
+        "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
+        "InsertBatch() function failed verification, see log");
+  }
+
+  codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
+  codegen->AddFunctionToJit(
+      insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_));
+  return Status::OK();
+}