You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:34 UTC

[2/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index f634193..46b91ba 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -31,10 +31,7 @@
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-state.h"
-#include "util/bloom-filter.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -44,35 +41,26 @@
 
 DEFINE_bool(enable_phj_probe_side_filtering, true, "Deprecated.");
 
-const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "Failed to acquire initial read buffer "
-    "for stream in hash join node $0. Reducing query concurrency or increasing the "
-    "memory limit may help this query to complete successfully.";
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read buffer for stream in hash join node $0. Reducing "
+    "query concurrency or increasing the memory limit may help this query to complete "
+    "successfully.";
 
 using namespace impala;
 using namespace llvm;
 using namespace strings;
+using std::unique_ptr;
 
 PartitionedHashJoinNode::PartitionedHashJoinNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : BlockingJoinNode("PartitionedHashJoinNode", tnode.hash_join_node.join_op,
-        pool, tnode, descs),
-    is_not_distinct_from_(),
-    block_mgr_client_(NULL),
-    partition_build_timer_(NULL),
+  : BlockingJoinNode(
+        "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
+    num_probe_rows_partitioned_(NULL),
     null_aware_eval_timer_(NULL),
     state_(PARTITIONING_BUILD),
-    partition_pool_(new ObjectPool()),
-    input_partition_(NULL),
-    null_aware_partition_(NULL),
-    non_empty_build_(false),
-    null_probe_rows_(NULL),
     null_probe_output_idx_(-1),
-    process_build_batch_fn_(NULL),
-    process_build_batch_fn_level0_(NULL),
     process_probe_batch_fn_(NULL),
-    process_probe_batch_fn_level0_(NULL),
-    insert_batch_fn_(NULL),
-    insert_batch_fn_level0_(NULL) {
+    process_probe_batch_fn_level0_(NULL) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
 }
 
@@ -86,35 +74,22 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   DCHECK(tnode.__isset.hash_join_node);
   const vector<TEqJoinCondition>& eq_join_conjuncts =
       tnode.hash_join_node.eq_join_conjuncts;
-  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
+  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+  // being separated out further.
+  builder_.reset(
+      new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
+  RETURN_IF_ERROR(builder_->Init(state, eq_join_conjuncts, tnode.runtime_filters));
+
+  for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ExprContext* ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].left, &ctx));
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.left, &ctx));
     probe_expr_ctxs_.push_back(ctx);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].right, &ctx));
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.right, &ctx));
     build_expr_ctxs_.push_back(ctx);
-    is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
   }
-  RETURN_IF_ERROR(
-      Expr::CreateExprTrees(pool_, tnode.hash_join_node.other_join_conjuncts,
-                            &other_join_conjunct_ctxs_));
-
-  for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
-    // If filter propagation not enabled, only consider building broadcast joins (that may
-    // be consumed by this fragment).
-    if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
-        !filter.is_broadcast_join) {
-      continue;
-    }
-    if (state->query_options().disable_row_runtime_filtering
-        && !filter.applied_on_partition_columns) {
-      continue;
-    }
-    FilterContext filter_ctx;
-    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.src_expr, &filter_ctx.expr));
-    filters_.push_back(filter_ctx);
-  }
-
+  RETURN_IF_ERROR(Expr::CreateExprTrees(
+      pool_, tnode.hash_join_node.other_join_conjuncts, &other_join_conjunct_ctxs_));
   DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
       eq_join_conjuncts.size() == 1);
   return Status::OK();
@@ -134,20 +109,17 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
 
+  RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+  runtime_profile()->PrependChild(builder_->profile());
+
   // build and probe exprs are evaluated in the context of the rows produced by our
   // right and left children, respectively
   RETURN_IF_ERROR(
       Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker()));
   RETURN_IF_ERROR(
       Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker()));
-  for (const FilterContext& ctx: filters_) {
-    RETURN_IF_ERROR(ctx.expr->Prepare(state, child(1)->row_desc(), expr_mem_tracker()));
-    AddExprCtxToFree(ctx.expr);
-  }
 
-  // Although ProcessBuildInput() may be run in a separate thread, it is safe to free
-  // local allocations in QueryMaintenance() since the build thread is not run
-  // concurrently with other expr evaluation in this join node.
+  // Build expressions may be evaluated during probing, so must be freed.
   // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
   // values in ExprValuesCache. Local allocations need to survive until the cache is reset
   // so we need to manually free probe expr local allocations.
@@ -161,84 +133,25 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
       Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
   AddExprCtxsToFree(other_join_conjunct_ctxs_);
 
-  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
-      Substitute("PartitionedHashJoinNode id=$0 ptr=$1", id_, this),
-      MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
-
-  const bool should_store_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
-      join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
-      std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
-                      std::logical_or<bool>());
   RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
-      should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
-      &ht_ctx_));
+      builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
+      state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
+      child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
   }
 
-  partition_build_timer_ = ADD_TIMER(runtime_profile(), "BuildPartitionTime");
-  num_hash_buckets_ =
-      ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
-  partitions_created_ =
-      ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
-  max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
-      "MaxPartitionLevel", TUnit::UNIT);
-  num_build_rows_partitioned_ =
-      ADD_COUNTER(runtime_profile(), "BuildRowsPartitioned", TUnit::UNIT);
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
-  num_repartitions_ =
-      ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
-  num_spilled_partitions_ =
-      ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
-  largest_partition_percent_ = runtime_profile()->AddHighWaterMarkCounter(
-      "LargestPartitionPercent", TUnit::UNIT);
-  num_hash_collisions_ =
-      ADD_COUNTER(runtime_profile(), "HashCollisions", TUnit::UNIT);
-
-  bool build_codegen_enabled = false;
+
   bool probe_codegen_enabled = false;
-  bool ht_construction_codegen_enabled = false;
-  Status codegen_status;
-  Status build_codegen_status;
   Status probe_codegen_status;
-  Status insert_codegen_status;
   if (state->codegen_enabled()) {
-    // Codegen for hashing rows
-    Function* hash_fn;
-    codegen_status = ht_ctx_->CodegenHashRow(state, false, &hash_fn);
-    Function* murmur_hash_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
-
-    // Codegen for evaluating build rows
-    Function* eval_build_row_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
-
-    if (codegen_status.ok()) {
-      // Codegen for build path
-      build_codegen_status =
-          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
-      if (build_codegen_status.ok()) build_codegen_enabled = true;
-      // Codegen for probe path
-      probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
-      if (probe_codegen_status.ok()) probe_codegen_enabled = true;
-      // Codegen for InsertBatch()
-      insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
-          eval_build_row_fn);
-      if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
-    } else {
-      build_codegen_status = codegen_status;
-      probe_codegen_status = codegen_status;
-      insert_codegen_status = codegen_status;
-    }
+    probe_codegen_status = CodegenProcessProbeBatch(state);
+    probe_codegen_enabled = probe_codegen_status.ok();
   }
   runtime_profile()->AddCodegenMsg(
-      build_codegen_enabled, codegen_status, "Build Side");
-  runtime_profile()->AddCodegenMsg(
-      probe_codegen_enabled, codegen_status, "Probe Side");
-  runtime_profile()->AddCodegenMsg(
-      ht_construction_codegen_enabled, codegen_status, "Hash Table Construction");
+      probe_codegen_enabled, probe_codegen_status, "Probe Side");
   return Status::OK();
 }
 
@@ -248,19 +161,10 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state));
-  for (const FilterContext& filter: filters_) RETURN_IF_ERROR(filter.expr->Open(state));
-  AllocateRuntimeFilters(state);
 
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    null_aware_partition_ = partition_pool_->Add(new Partition(state, this, 0));
-    RETURN_IF_ERROR(
-        null_aware_partition_->build_rows()->Init(id(), runtime_profile(), false));
-    RETURN_IF_ERROR(
-        null_aware_partition_->probe_rows()->Init(id(), runtime_profile(), false));
-    null_probe_rows_ = new BufferedTupleStream(
-        state, child(0)->row_desc(), state->block_mgr(), block_mgr_client_,
-        true /* use_initial_small_buffers */, false /* read_write */ );
-    RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+    RETURN_IF_ERROR(InitNullAwareProbePartition());
+    RETURN_IF_ERROR(InitNullProbeRows());
   }
 
   // Check for errors and free local allocations before opening children.
@@ -269,489 +173,116 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   // The prepare functions of probe expressions may have done local allocations implicitly
   // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to
   // be freed now as they don't get freed again till probing. Other exprs' local allocations
-  // are freed in ExecNode::FreeLocalAllocations() in ProcessBuildInput().
+  // are freed in ExecNode::FreeLocalAllocations().
   ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
 
-  RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+  RETURN_IF_ERROR(PrepareForProbe());
+
+  UpdateState(PARTITIONING_PROBE);
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   ResetForProbe();
-  DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+  DCHECK(null_aware_probe_partition_ == NULL
+      || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
   return Status::OK();
 }
 
 Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    non_empty_build_ = false;
     null_probe_output_idx_ = -1;
     matched_null_probe_.clear();
     nulls_build_batch_.reset();
   }
   state_ = PARTITIONING_BUILD;
   ht_ctx_->set_level(0);
-  ClosePartitions();
+  CloseAndDeletePartitions();
+  builder_->Reset();
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
   return ExecNode::Reset(state);
 }
 
-void PartitionedHashJoinNode::ClosePartitions() {
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    hash_partitions_[i]->Close(NULL);
+Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
+  DCHECK(false) << "Should not be called, PHJ uses the BuildSink API";
+  return Status::OK();
+}
+
+void PartitionedHashJoinNode::CloseAndDeletePartitions() {
+  // Close all the partitions and clean up all references to them.
+  for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {
+    if (partition != NULL) partition->Close(NULL);
   }
-  hash_partitions_.clear();
-  for (list<Partition*>::iterator it = spilled_partitions_.begin();
-      it != spilled_partitions_.end(); ++it) {
-    (*it)->Close(NULL);
+  probe_hash_partitions_.clear();
+  for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
+    partition->Close(NULL);
   }
   spilled_partitions_.clear();
-  for (list<Partition*>::iterator it = output_build_partitions_.begin();
-      it != output_build_partitions_.end(); ++it) {
-    (*it)->Close(NULL);
-  }
-  output_build_partitions_.clear();
   if (input_partition_ != NULL) {
     input_partition_->Close(NULL);
-    input_partition_ = NULL;
+    input_partition_.reset();
   }
-  if (null_aware_partition_ != NULL) {
-    null_aware_partition_->Close(NULL);
-    null_aware_partition_ = NULL;
+  if (null_aware_probe_partition_ != NULL) {
+    null_aware_probe_partition_->Close(NULL);
+    null_aware_probe_partition_.reset();
   }
+  output_build_partitions_.clear();
   if (null_probe_rows_ != NULL) {
     null_probe_rows_->Close();
-    delete null_probe_rows_;
-    null_probe_rows_ = NULL;
+    null_probe_rows_.reset();
   }
-  partition_pool_->Clear();
 }
 
 void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (ht_ctx_.get() != NULL) ht_ctx_->Close();
-
+  if (ht_ctx_ != NULL) ht_ctx_->Close();
   nulls_build_batch_.reset();
-
-  ClosePartitions();
-
-  if (block_mgr_client_ != NULL) {
-    state->block_mgr()->ClearReservations(block_mgr_client_);
-  }
+  CloseAndDeletePartitions();
+  if (builder_ != NULL) builder_->Close(state);
   Expr::Close(build_expr_ctxs_, state);
   Expr::Close(probe_expr_ctxs_, state);
   Expr::Close(other_join_conjunct_ctxs_, state);
-  for (const FilterContext& ctx: filters_) {
-    ctx.expr->Close(state);
-  }
   BlockingJoinNode::Close(state);
 }
 
-PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
-    PartitionedHashJoinNode* parent, int level)
+PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
+    PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
+    unique_ptr<BufferedTupleStream> probe_rows)
   : parent_(parent),
-    is_closed_(false),
-    is_spilled_(false),
-    level_(level) {
-  build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
-      state->block_mgr(), parent_->block_mgr_client_,
-      true /* use_initial_small_buffers */, false /* read_write */);
-  DCHECK(build_rows_ != NULL);
-  probe_rows_ = new BufferedTupleStream(state, parent_->child(0)->row_desc(),
-      state->block_mgr(), parent_->block_mgr_client_,
-      true /* use_initial_small_buffers */, false /* read_write */ );
-  DCHECK(probe_rows_ != NULL);
-}
-
-PartitionedHashJoinNode::Partition::~Partition() {
-  DCHECK(is_closed());
+    build_partition_(build_partition),
+    probe_rows_(std::move(probe_rows)) {
+  DCHECK(probe_rows_->has_write_block());
+  DCHECK(!probe_rows_->using_small_buffers());
+  DCHECK(!probe_rows_->is_pinned());
 }
 
-int64_t PartitionedHashJoinNode::Partition::EstimatedInMemSize() const {
-  return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
+PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
+  DCHECK(IsClosed());
 }
 
-void PartitionedHashJoinNode::Partition::Close(RowBatch* batch) {
-  if (is_closed()) return;
-  is_closed_ = true;
-
-  if (hash_tbl_.get() != NULL) {
-    COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
-    hash_tbl_->Close();
-  }
-
-  // Transfer ownership of build_rows_/probe_rows_ to batch if batch is not NULL.
-  // Otherwise, close the stream here.
-  if (build_rows_ != NULL) {
-    if (batch == NULL) {
-      build_rows_->Close();
-      delete build_rows_;
-    } else {
-      batch->AddTupleStream(build_rows_);
-    }
-    build_rows_ = NULL;
-  }
-  if (probe_rows_ != NULL) {
-    if (batch == NULL) {
-      probe_rows_->Close();
-      delete probe_rows_;
-    } else {
-      batch->AddTupleStream(probe_rows_);
-    }
-    probe_rows_ = NULL;
-  }
-}
-
-Status PartitionedHashJoinNode::Partition::Spill(bool unpin_all_build) {
-  DCHECK(!is_closed_);
-  // Spilling should occur before we start processing probe rows.
-  DCHECK(parent_->state_ != PROCESSING_PROBE &&
-         parent_->state_ != PROBING_SPILLED_PARTITION) << parent_->state_;
-  DCHECK((is_spilled_ && parent_->state_ == REPARTITIONING) ||
-         probe_rows_->num_rows() == 0);
-  // Close the hash table as soon as possible to release memory.
-  if (hash_tbl() != NULL) {
-    hash_tbl_->Close();
-    hash_tbl_.reset();
-  }
-
-  bool got_buffer = true;
-  if (build_rows_->using_small_buffers()) {
-    RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
-  }
-  // Unpin the stream as soon as possible to increase the chances that the
-  // SwitchToIoBuffers() call below will succeed.
-  RETURN_IF_ERROR(build_rows_->UnpinStream(unpin_all_build));
-
-  if (got_buffer && probe_rows_->using_small_buffers()) {
-    RETURN_IF_ERROR(probe_rows_->SwitchToIoBuffers(&got_buffer));
-  }
-  if (!got_buffer) {
-    // We'll try again to get the buffers when the stream fills up the small buffers.
-    VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
-               << this << " of join=" << parent_->id_ << " build small buffers="
-               << build_rows_->using_small_buffers() << " probe small buffers="
-               << probe_rows_->using_small_buffers();
-    VLOG_FILE << GetStackTrace();
-  }
-
-  if (!is_spilled_) {
-    COUNTER_ADD(parent_->num_spilled_partitions_, 1);
-    if (parent_->num_spilled_partitions_->value() == 1) {
-      parent_->runtime_profile()->AppendExecOption("Spilled");
-    }
-  }
-
-  is_spilled_ = true;
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
-    bool* built) {
-  DCHECK(build_rows_ != NULL);
-  *built = false;
-
-  // TODO: estimate the entire size of the hash table and reserve all of it from
-  // the block mgr.
-
-  // We got the buffers we think we will need, try to build the hash table.
-  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
-  if (!*built) return Status::OK();
+Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
   bool got_read_buffer;
-  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
-  DCHECK(got_read_buffer) << "Stream was already pinned.";
-
-  RowBatch batch(parent_->child(1)->row_desc(), state->batch_size(),
-      parent_->mem_tracker());
-  HashTableCtx* ctx = parent_->ht_ctx_.get();
-  // TODO: move the batch and indices as members to avoid reallocating.
-  vector<BufferedTupleStream::RowIdx> indices;
-  bool eos = false;
-
-  // Allocate the partition-local hash table. Initialize the number of buckets based on
-  // the number of build rows (the number of rows is known at this point). This assumes
-  // there are no duplicates which can be wrong. However, the upside in the common case
-  // (few/no duplicates) is large and the downside when there are is low (a bit more
-  // memory; the bucket memory is small compared to the memory needed for all the build
-  // side allocations).
-  // One corner case is if the stream contains tuples with zero footprint (no materialized
-  // slots). If the tuples occupy no space, this implies all rows will be duplicates, so
-  // create a small hash table, IMPALA-2256.
-  // We always start with small pages in the hash table.
-  int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
-      HashTable::EstimateNumBuckets(build_rows()->num_rows()) : state->batch_size() * 2;
-  hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
-      true /* store_duplicates */,
-      parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(),
-      1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
-  if (!hash_tbl_->Init()) goto not_built;
-
-  do {
-    RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
-    DCHECK_EQ(batch.num_rows(), indices.size());
-    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
-        << build_rows()->RowConsumesMemory();
-    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-    SCOPED_TIMER(parent_->build_timer_);
-    if (parent_->insert_batch_fn_ != NULL) {
-      InsertBatchFn insert_batch_fn;
-      if (ctx->level() == 0) {
-        insert_batch_fn = parent_->insert_batch_fn_level0_;
-      } else {
-        insert_batch_fn = parent_->insert_batch_fn_;
-      }
-      DCHECK(insert_batch_fn != NULL);
-      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
-        goto not_built;
-      }
-    } else {
-      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
-        goto not_built;
-      }
-    }
-    RETURN_IF_ERROR(state->GetQueryStatus());
-    parent_->FreeLocalAllocations();
-    batch.Reset();
-  } while (!eos);
-
-  // The hash table fits in memory and is built.
-  DCHECK(*built);
-  DCHECK(hash_tbl_.get() != NULL);
-  is_spilled_ = false;
-  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
-  return Status::OK();
-
-not_built:
-  *built = false;
-  if (hash_tbl_.get() != NULL) {
-    hash_tbl_->Close();
-    hash_tbl_.reset();
-  }
-  return Status::OK();
-}
-
-void PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) {
-  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
-      << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
-  DCHECK(ht_ctx_.get() != NULL);
-  for (int i = 0; i < filters_.size(); ++i) {
-    filters_[i].local_bloom_filter =
-        state->filter_bank()->AllocateScratchBloomFilter(filters_[i].filter->id());
-  }
-}
-
-void PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state,
-    int64_t total_build_rows) {
-  int32_t num_enabled_filters = 0;
-  // Use total_build_rows to estimate FP-rate of each Bloom filter, and publish
-  // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
-  // serialisation time, and reduces the cost of applying the filter at the scan - most
-  // significantly for per-row filters. However, the number of build rows could be a very
-  // poor estimate of the NDV - particularly if the filter expression is a function of
-  // several columns.
-  // TODO: Better heuristic.
-  for (const FilterContext& ctx: filters_) {
-    // TODO: Consider checking actual number of bits set in filter to compute FP rate.
-    // TODO: Consider checking this every few batches or so.
-    bool fp_rate_too_high =
-        state->filter_bank()->FpRateTooHigh(ctx.filter->filter_size(), total_build_rows);
-    state->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
-        fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
-
-    num_enabled_filters += !fp_rate_too_high;
-  }
-
-  if (filters_.size() > 0) {
-    if (num_enabled_filters == filters_.size()) {
-      runtime_profile()->AppendExecOption(
-          Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
-              filters_.size() == 1 ? "" : "s"));
-    } else {
-      string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
-          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
-          filters_.size() - num_enabled_filters);
-      runtime_profile()->AppendExecOption(exec_option);
-    }
-  }
-}
-
-bool PartitionedHashJoinNode::AppendRowStreamFull(BufferedTupleStream* stream,
-    TupleRow* row, Status* status) {
-  while (status->ok()) {
-    // Check if the stream is still using small buffers and try to switch to IO-buffers.
-    if (stream->using_small_buffers()) {
-      bool got_buffer;
-      *status = stream->SwitchToIoBuffers(&got_buffer);
-      if (!status->ok()) return false;
-      if (got_buffer) {
-        if (LIKELY(stream->AddRow(row, status))) return true;
-        if (!status->ok()) return false;
-      }
-    }
-    // We ran out of memory. Pick a partition to spill.
-    Partition* spilled_partition;
-    *status = SpillPartition(&spilled_partition);
-    if (!status->ok()) return false;
-    if (stream->AddRow(row, status)) return true;
-    // Spilling one partition does not guarantee we can append a row. Keep
-    // spilling until we can append this row.
-  }
-  return false;
-}
-
-// TODO: Can we do better with a different spilling heuristic?
-Status PartitionedHashJoinNode::SpillPartition(Partition** spilled_partition) {
-  int64_t max_freed_mem = 0;
-  int partition_idx = -1;
-  *spilled_partition = NULL;
-
-  // Iterate over the partitions and pick the largest partition to spill.
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* candidate = hash_partitions_[i];
-    if (candidate->is_closed()) continue;
-    if (candidate->is_spilled()) continue;
-    int64_t mem = candidate->build_rows()->bytes_in_mem(false);
-    // TODO: What should we do here if probe_rows()->num_rows() > 0 ? We should be able
-    // to spill INNER JOINS, but not many of the other joins.
-    if (candidate->hash_tbl() != NULL) {
-      // IMPALA-1488: Do not spill partitions that already had matches, because we
-      // are going to lose information and return wrong results.
-      if (UNLIKELY(candidate->hash_tbl()->HasMatches())) continue;
-      mem += candidate->hash_tbl()->ByteSize();
-    }
-    if (mem > max_freed_mem) {
-      max_freed_mem = mem;
-      partition_idx = i;
-    }
-  }
-
-  if (partition_idx == -1) {
-    // Could not find a partition to spill. This means the mem limit was just too low.
-    return runtime_state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
-  }
-
-  VLOG(2) << "Spilling partition: " << partition_idx << endl << NodeDebugString();
-  RETURN_IF_ERROR(hash_partitions_[partition_idx]->Spill(false));
-  DCHECK(hash_partitions_[partition_idx]->probe_rows()->has_write_block());
-  hash_tbls_[partition_idx] = NULL;
-  *spilled_partition = hash_partitions_[partition_idx];
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
-  // Do a full scan of child(1) and partition the rows.
-  {
-    SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-    RETURN_IF_ERROR(child(1)->Open(state));
+  RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_,
+        Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_));
   }
-  RETURN_IF_ERROR(ProcessBuildInput(state, 0));
-
-  UpdateState(PROCESSING_PROBE);
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level) {
-  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
-    return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id_,
-        MAX_PARTITION_DEPTH);
-  }
-
-  DCHECK(hash_partitions_.empty());
-  if (input_partition_ != NULL) {
-    DCHECK(input_partition_->build_rows() != NULL);
-    DCHECK_EQ(input_partition_->build_rows()->blocks_pinned(), 0) << NodeDebugString();
-    bool got_read_buffer;
-    RETURN_IF_ERROR(
-        input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer));
-    if (!got_read_buffer) {
-      return mem_tracker()->MemLimitExceeded(
-          state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-    }
-  }
-
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* new_partition = new Partition(state, this, level);
-    DCHECK(new_partition != NULL);
-    hash_partitions_.push_back(partition_pool_->Add(new_partition));
-    RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
-    // Initialize a buffer for the probe here to make sure why have it if we need it.
-    // While this is not strictly necessary (there are some cases where we won't need this
-    // buffer), the benefit is low. Not grabbing this buffer means there is an additional
-    // buffer that could be used for the build side. However since this is only one
-    // buffer, there is only a small range of build input sizes where this is beneficial
-    // (an IO buffer size). It makes the logic much more complex to enable this
-    // optimization.
-    RETURN_IF_ERROR(new_partition->probe_rows()->Init(id(), runtime_profile(), false));
-  }
-  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
-  COUNTER_SET(max_partition_level_, level);
-
-  DCHECK_EQ(build_batch_->num_rows(), 0);
-  bool eos = false;
-  int64_t total_build_rows = 0;
-  while (!eos) {
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    // 'probe_expr_ctxs_' should have made no local allocations in this function.
-    DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_))
-        << Expr::DebugString(probe_expr_ctxs_);
-    if (input_partition_ == NULL) {
-      // If we are still consuming batches from the build side.
-      {
-        SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-        RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos));
-      }
-      COUNTER_ADD(build_row_counter_, build_batch_->num_rows());
-    } else {
-      // If we are consuming batches that have already been partitioned.
-      RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(build_batch_.get(), &eos));
-    }
-    total_build_rows += build_batch_->num_rows();
-
-    SCOPED_TIMER(partition_build_timer_);
-    if (process_build_batch_fn_ == NULL) {
-      bool build_filters = ht_ctx_->level() == 0;
-      RETURN_IF_ERROR(ProcessBuildBatch(build_batch_.get(), build_filters));
+void PartitionedHashJoinNode::ProbePartition::Close(RowBatch* batch) {
+  if (IsClosed()) return;
+  if (probe_rows_ != NULL) {
+    if (batch == NULL) {
+      probe_rows_->Close();
     } else {
-      DCHECK(process_build_batch_fn_level0_ != NULL);
-      if (ht_ctx_->level() == 0) {
-        RETURN_IF_ERROR(
-            process_build_batch_fn_level0_(this, build_batch_.get(), true));
-      } else {
-        RETURN_IF_ERROR(process_build_batch_fn_(this, build_batch_.get(), false));
-      }
+      batch->AddTupleStream(probe_rows_.release());
     }
-    build_batch_->Reset();
-    DCHECK(!build_batch_->AtCapacity());
+    probe_rows_.reset();
   }
-
-  if (ht_ctx_->level() == 0) PublishRuntimeFilters(state, total_build_rows);
-
-  if (input_partition_ != NULL) {
-    // Done repartitioning build input, close it now.
-    input_partition_->build_rows_->Close();
-    input_partition_->build_rows_ = NULL;
-  }
-
-  stringstream ss;
-  ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", id(),
-            hash_partitions_[0]->level_, total_build_rows);
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    double percent =
-        partition->build_rows()->num_rows() * 100 / static_cast<double>(total_build_rows);
-    ss << "  " << i << " "  << (partition->is_spilled() ? "spilled" : "not spilled")
-       << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
-       << "    #rows:" << partition->build_rows()->num_rows() << endl;
-    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
-  }
-  VLOG(2) << ss.str();
-
-  COUNTER_ADD(num_build_rows_partitioned_, total_build_rows);
-  non_empty_build_ |= (total_build_rows > 0);
-  RETURN_IF_ERROR(BuildHashTables(state));
-  return Status::OK();
 }
 
 Status PartitionedHashJoinNode::NextProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
+  DCHECK_EQ(state_, PARTITIONING_PROBE);
   DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
   do {
     // Loop until we find a non-empty row batch.
@@ -777,6 +308,7 @@ Status PartitionedHashJoinNode::NextProbeRowBatch(
 Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
   DCHECK(input_partition_ != NULL);
+  DCHECK(state_ == PROBING_SPILLED_PARTITION || state_ == REPARTITIONING_PROBE);
   probe_batch_->TransferResourceOwnership(out_batch);
   if (out_batch->AtCapacity()) {
     // The out_batch has resources associated with it that will be recycled on the
@@ -788,114 +320,108 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
   if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
     // Continue from the current probe stream.
     bool eos = false;
-    RETURN_IF_ERROR(input_partition_->probe_rows()->GetNext(probe_batch_.get(), &eos));
+    RETURN_IF_ERROR(probe_rows->GetNext(probe_batch_.get(), &eos));
     DCHECK_GT(probe_batch_->num_rows(), 0);
     ResetForProbe();
   } else {
-    // Done with this partition.
-    if (!input_partition_->is_spilled() &&
-        (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN)) {
+    // Finished processing spilled probe rows from this partition.
+    if (state_ == PROBING_SPILLED_PARTITION && NeedToProcessUnmatchedBuildRows()) {
+      // If the build partition was in memory, we are done probing this partition.
       // In case of right-outer, right-anti and full-outer joins, we move this partition
       // to the list of partitions that we need to output their unmatched build rows.
       DCHECK(output_build_partitions_.empty());
-      DCHECK(input_partition_->hash_tbl_.get() != NULL) << " id: " << id_
-           << " Build: " << input_partition_->build_rows()->num_rows()
-           << " Probe: " << probe_rows->num_rows() << endl
-           << GetStackTrace();
+      DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
+          << " id: " << id_
+          << " Build: " << input_partition_->build_partition()->build_rows()->num_rows()
+          << " Probe: " << probe_rows->num_rows() << endl
+          << GetStackTrace();
       hash_tbl_iterator_ =
-          input_partition_->hash_tbl_->FirstUnmatched(ht_ctx_.get());
-      output_build_partitions_.push_back(input_partition_);
+          input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+      output_build_partitions_.push_back(input_partition_->build_partition());
     } else {
-      // In any other case, just close the input partition.
-      input_partition_->Close(out_batch);
-      input_partition_ = NULL;
+      // In any other case, just close the input build partition.
+      input_partition_->build_partition()->Close(out_batch);
     }
+    input_partition_->Close(out_batch);
+    input_partition_.reset();
     current_probe_row_ = NULL;
     probe_batch_pos_ = -1;
   }
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
+Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
+    RuntimeState* state, bool* got_partition) {
+  VLOG(2) << "PrepareSpilledPartitionForProbe\n" << NodeDebugString();
   DCHECK(input_partition_ == NULL);
-  if (spilled_partitions_.empty()) return Status::OK();
-  VLOG(2) << "PrepareNextPartition\n" << NodeDebugString();
+  DCHECK_EQ(builder_->num_hash_partitions(), 0);
+  DCHECK(probe_hash_partitions_.empty());
+  if (spilled_partitions_.empty()) {
+    *got_partition = false;
+    return Status::OK();
+  }
 
-  input_partition_ = spilled_partitions_.front();
+  input_partition_ = std::move(spilled_partitions_.front());
   spilled_partitions_.pop_front();
-  DCHECK(input_partition_->is_spilled());
+  PhjBuilder::Partition* build_partition = input_partition_->build_partition();
+  DCHECK(build_partition->is_spilled());
 
-  // Reserve one buffer to read the probe side.
-  bool got_read_buffer;
-  RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer));
-  if (!got_read_buffer) {
-    return mem_tracker()->MemLimitExceeded(
-        state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-  }
-  ht_ctx_->set_level(input_partition_->level_);
-
-  int64_t mem_limit = mem_tracker()->SpareCapacity();
-  // Try to build a hash table on top the spilled build rows.
-  bool built = false;
-  int64_t estimated_memory = input_partition_->EstimatedInMemSize();
-  if (estimated_memory < mem_limit) {
-    ht_ctx_->set_level(input_partition_->level_);
-    RETURN_IF_ERROR(input_partition_->BuildHashTable(state, &built));
-  } else {
-    LOG(INFO) << "In hash join id=" << id_ << " the estimated needed memory ("
-        << estimated_memory << ") for partition " << input_partition_ << " with "
-        << input_partition_->build_rows()->num_rows() << " build rows is larger "
-        << " than the mem_limit (" << mem_limit << ").";
-  }
+  // Make sure we have a buffer to read the probe rows before we build the hash table.
+  RETURN_IF_ERROR(input_partition_->PrepareForRead());
+  ht_ctx_->set_level(build_partition->level());
+
+  // Try to build a hash table for the spilled build partition.
+  bool built;
+  RETURN_IF_ERROR(build_partition->BuildHashTable(&built));
 
   if (!built) {
     // This build partition still does not fit in memory, repartition.
-    UpdateState(REPARTITIONING);
-    DCHECK(input_partition_->is_spilled());
-    input_partition_->Spill(false);
-    ht_ctx_->set_level(input_partition_->level_ + 1);
-    int64_t num_input_rows = input_partition_->build_rows()->num_rows();
-    RETURN_IF_ERROR(ProcessBuildInput(state, input_partition_->level_ + 1));
+    UpdateState(REPARTITIONING_BUILD);
+
+    int next_partition_level = build_partition->level() + 1;
+    if (UNLIKELY(next_partition_level >= MAX_PARTITION_DEPTH)) {
+      return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id(),
+          MAX_PARTITION_DEPTH);
+    }
+    ht_ctx_->set_level(next_partition_level);
+
+    // Spill to free memory from hash tables and pinned streams for use in new partitions.
+    build_partition->Spill(BufferedTupleStream::UNPIN_ALL);
+    // Temporarily free up the probe buffer to use when repartitioning.
+    input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
+    DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
+    int64_t num_input_rows = build_partition->build_rows()->num_rows();
+    RETURN_IF_ERROR(builder_->RepartitionBuildInput(
+        build_partition, next_partition_level, input_partition_->probe_rows()));
 
     // Check if there was any reduction in the size of partitions after repartitioning.
-    int64_t largest_partition = LargestSpilledPartition();
-    DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
-        "more rows than the input";
-    if (UNLIKELY(num_input_rows == largest_partition)) {
+    int64_t largest_partition_rows = builder_->LargestPartitionRows();
+    DCHECK_GE(num_input_rows, largest_partition_rows) << "Cannot have a partition with "
+                                                         "more rows than the input";
+    if (UNLIKELY(num_input_rows == largest_partition_rows)) {
       return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
-          input_partition_->level_ + 1, num_input_rows);
+          next_partition_level, num_input_rows);
     }
+
+    RETURN_IF_ERROR(PrepareForProbe());
+    UpdateState(REPARTITIONING_PROBE);
   } else {
-    DCHECK(hash_partitions_.empty());
-    DCHECK(!input_partition_->is_spilled());
-    DCHECK(input_partition_->hash_tbl() != NULL);
+    DCHECK(!input_partition_->build_partition()->is_spilled());
+    DCHECK(input_partition_->build_partition()->hash_tbl() != NULL);
     // In this case, we did not have to partition the build again, we just built
     // a hash table. This means the probe does not have to be partitioned either.
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      hash_tbls_[i] = input_partition_->hash_tbl();
+      hash_tbls_[i] = input_partition_->build_partition()->hash_tbl();
     }
     UpdateState(PROBING_SPILLED_PARTITION);
   }
 
-  COUNTER_ADD(num_repartitions_, 1);
   COUNTER_ADD(num_probe_rows_partitioned_, input_partition_->probe_rows()->num_rows());
+  *got_partition = true;
   return Status::OK();
 }
 
-int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
-  int64_t max_rows = 0;
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    DCHECK(partition != NULL) << i << " " << hash_partitions_.size();
-    if (partition->is_closed() || !partition->is_spilled()) continue;
-    int64_t rows = partition->build_rows()->num_rows();
-    rows += partition->probe_rows()->num_rows();
-    if (rows > max_rows) max_rows = rows;
-  }
-  return max_rows;
-}
-
 int PartitionedHashJoinNode::ProcessProbeBatch(
     const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
     RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
@@ -948,24 +474,26 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
   Status status = Status::OK();
   while (true) {
+    DCHECK(!*eos);
     DCHECK(status.ok());
     DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
 
-    if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
-        !output_build_partitions_.empty())  {
-      // In case of right-outer, right-anti and full-outer joins, flush the remaining
-      // unmatched build rows of any partition we are done processing, before processing
-      // the next batch.
+    if (!output_build_partitions_.empty()) {
+      DCHECK(NeedToProcessUnmatchedBuildRows());
+
+      // Flush the remaining unmatched build rows of any partitions we are done
+      // processing before moving onto the next partition.
       OutputUnmatchedBuild(out_batch);
       if (!output_build_partitions_.empty()) break;
 
-      // Finished to output unmatched build rows, move to next partition.
-      DCHECK(hash_partitions_.empty());
-      RETURN_IF_ERROR(PrepareNextPartition(state));
-      if (input_partition_ == NULL) {
+      // Finished outputting unmatched build rows, move to next partition.
+      DCHECK_EQ(builder_->num_hash_partitions(), 0);
+      DCHECK(probe_hash_partitions_.empty());
+      bool got_partition;
+      RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+      if (!got_partition) {
         *eos = true;
         break;
       }
@@ -974,7 +502,8 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
       // In this case, we want to output rows from the null aware partition.
-      if (null_aware_partition_ == NULL) {
+      if (builder_->null_aware_partition() == NULL) {
+        DCHECK(null_aware_probe_partition_ == NULL);
         *eos = true;
         break;
       }
@@ -985,14 +514,14 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
         continue;
       }
 
-      if (nulls_build_batch_.get() != NULL) {
+      if (nulls_build_batch_ != NULL) {
         RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
         if (out_batch->AtCapacity()) break;
         continue;
       }
     }
 
-    // Finish up the current batch.
+    // Finish processing rows in the current probe batch.
     if (probe_batch_pos_ != -1) {
       // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
       // in the xcompiled function, so call it here instead.
@@ -1026,10 +555,16 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     }
 
     // Try to continue from the current probe side input.
-    if (input_partition_ == NULL) {
+    if (state_ == PARTITIONING_PROBE) {
+      DCHECK(input_partition_ == NULL);
       RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
     } else {
-      RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+      DCHECK(state_ == REPARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION)
+          << state_;
+      DCHECK(probe_side_eos_);
+      if (input_partition_ != NULL) {
+        RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+      }
     }
     // Free local allocations of the probe side expressions only after ExprValuesCache
     // has been reset.
@@ -1045,30 +580,34 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     if (probe_batch_pos_ == 0) continue;
     DCHECK_EQ(probe_batch_pos_, -1);
 
-    // Finished up all probe rows for hash_partitions_.
-    RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
-    if (out_batch->AtCapacity()) break;
+    // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
+    // the hash partitions, e.g. if we had to output some unmatched build rows below.
+    if (builder_->num_hash_partitions() != 0) {
+      RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
+      if (out_batch->AtCapacity()) break;
+    }
 
-    if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
-        !output_build_partitions_.empty()) {
+    if (!output_build_partitions_.empty()) {
+      DCHECK(NeedToProcessUnmatchedBuildRows());
       // There are some partitions that need to flush their unmatched build rows.
-      continue;
+      OutputUnmatchedBuild(out_batch);
+      if (!output_build_partitions_.empty()) break;
     }
-    // Move onto the next partition.
-    RETURN_IF_ERROR(PrepareNextPartition(state));
+    // Move onto the next spilled partition.
+    bool got_partition;
+    RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+    if (got_partition) continue; // Probe the spilled partition.
 
-    if (input_partition_ == NULL) {
-      if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        RETURN_IF_ERROR(PrepareNullAwarePartition());
-      }
-      if (null_aware_partition_ == NULL) {
-        *eos = true;
-        break;
-      } else {
-        *eos = false;
-      }
+    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+      // Prepare the null-aware partitions, then resume at the top of the loop to output
+      // the rows.
+      RETURN_IF_ERROR(PrepareNullAwarePartition());
+      continue;
     }
+    DCHECK(builder_->null_aware_partition() == NULL);
+
+    *eos = true;
+    break;
   }
 
   if (ReachedLimit()) *eos = true;
@@ -1077,8 +616,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
 void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
-  DCHECK(join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN);
+  DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   const int num_conjuncts = conjunct_ctxs_.size();
@@ -1141,8 +679,9 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
 
 Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
     RowBatch* out_batch) {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() == NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ == NULL);
   DCHECK_NE(probe_batch_pos_, -1);
 
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
@@ -1151,12 +690,12 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
     if (out_batch->AtCapacity()) return Status::OK();
     bool eos;
     RETURN_IF_ERROR(null_probe_rows_->GetNext(probe_batch_.get(), &eos));
-    if (probe_batch_->num_rows() == 0) {
+    if (probe_batch_->num_rows() == 0 && eos) {
       // All done.
-      null_aware_partition_->Close(out_batch);
-      null_aware_partition_ = NULL;
-      out_batch->AddTupleStream(null_probe_rows_);
-      null_probe_rows_ = NULL;
+      builder_->CloseNullAwarePartition(out_batch);
+      null_aware_probe_partition_->Close(out_batch);
+      null_aware_probe_partition_.reset();
+      out_batch->AddTupleStream(null_probe_rows_.release());
       return Status::OK();
     }
   }
@@ -1183,14 +722,54 @@ static Status NullAwareAntiJoinError(bool build) {
       " many NULLs on the $0 side to perform this join.", build ? "build" : "probe"));
 }
 
+Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
+  RuntimeState* state = runtime_state_;
+  unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>(
+      state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(),
+      false /* use_initial_small_buffers */, false /* read_write */);
+  Status status = probe_rows->Init(id(), runtime_profile(), false);
+  if (!status.ok()) goto error;
+  bool got_buffer;
+  status = probe_rows->PrepareForWrite(&got_buffer);
+  if (!status.ok()) goto error;
+  if (!got_buffer) {
+    status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+    goto error;
+  }
+  null_aware_probe_partition_.reset(new ProbePartition(
+      state, this, builder_->null_aware_partition(), std::move(probe_rows)));
+  return Status::OK();
+
+error:
+  DCHECK(!status.ok());
+  // Ensure the temporary 'probe_rows' stream is closed correctly on error.
+  probe_rows->Close();
+  return status;
+}
+
+Status PartitionedHashJoinNode::InitNullProbeRows() {
+  RuntimeState* state = runtime_state_;
+  null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
+      state->block_mgr(), builder_->block_mgr_client(),
+      false /* use_initial_small_buffers */, false /* read_write */);
+  RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+  bool got_buffer;
+  RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+  }
+  return Status::OK();
+}
+
 Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() == NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ == NULL);
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
-  BufferedTupleStream* build_stream = null_aware_partition_->build_rows();
-  BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+  BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
+  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
 
   if (build_stream->num_rows() == 0) {
     // There were no build rows. Nothing to do. Just prepare to output the null
@@ -1219,14 +798,15 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
 
 Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RowBatch* out_batch) {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() != NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ != NULL);
 
   ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
   int num_join_conjuncts = other_join_conjunct_ctxs_.size();
-  DCHECK(probe_batch_.get() != NULL);
+  DCHECK(probe_batch_ != NULL);
 
-  BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
     probe_batch_pos_ = 0;
     probe_batch_->TransferResourceOwnership(out_batch);
@@ -1237,7 +817,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
 
     if (probe_batch_->num_rows() == 0) {
-      RETURN_IF_ERROR(EvaluateNullProbe(null_aware_partition_->build_rows()));
+      RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows()));
       nulls_build_batch_.reset();
       RETURN_IF_ERROR(PrepareNullAwareNullProbe());
       return Status::OK();
@@ -1269,100 +849,55 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   return Status::OK();
 }
 
-// When this function is called, we've finished processing the current build input
-// (either from child(1) or from repartitioning a spilled partition). The build rows
-// have only been partitioned, we still need to build hash tables over them. Some
-// of the partitions could have already been spilled and attempting to build hash
-// tables over the non-spilled ones can cause them to spill.
-//
-// At the end of the function we'd like all partitions to either have a hash table
-// (and therefore not spilled) or be spilled. Partitions that have a hash table don't
-// need to spill on the probe side.
-//
-// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
-// build rows and hash table and the value is the expected IO savings.
-// For now, we go with a greedy solution.
-//
-// TODO: implement the knapsack solution.
-Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
-  DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
-
-  // First loop over the partitions and build hash tables for the partitions that did
-  // not already spill.
-  for (Partition* partition: hash_partitions_) {
-    if (partition->build_rows()->num_rows() == 0) {
-      // This partition is empty, no need to do anything else.
-      partition->Close(NULL);
-      continue;
-    }
-
-    if (!partition->is_spilled()) {
-      bool built = false;
-      DCHECK(partition->build_rows()->is_pinned());
-      RETURN_IF_ERROR(partition->BuildHashTable(state, &built));
-      // If we did not have enough memory to build this hash table, we need to spill this
-      // partition (clean up the hash table, unpin build).
-      if (!built) RETURN_IF_ERROR(partition->Spill(true));
-    }
-  }
+Status PartitionedHashJoinNode::PrepareForProbe() {
+  DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+  DCHECK(probe_hash_partitions_.empty());
 
-  // Collect all the spilled partitions that don't have an IO buffer. We need to reserve
-  // an IO buffer for those partitions. Reserving an IO buffer can cause more partitions
-  // to spill so this process is recursive.
-  list<Partition*> spilled_partitions;
-  for (Partition* partition: hash_partitions_) {
-    if (partition->is_closed()) continue;
-    if (partition->is_spilled() && partition->probe_rows()->using_small_buffers()) {
-      spilled_partitions.push_back(partition);
-    }
-  }
-  while (!spilled_partitions.empty()) {
-    Partition* partition = spilled_partitions.front();
-    spilled_partitions.pop_front();
-
-    while (true) {
-      bool got_buffer;
-      RETURN_IF_ERROR(partition->probe_rows()->SwitchToIoBuffers(&got_buffer));
-      if (got_buffer) break;
-      Partition* spilled_partition;
-      RETURN_IF_ERROR(SpillPartition(&spilled_partition));
-      DCHECK(spilled_partition->is_spilled());
-      if (spilled_partition->probe_rows()->using_small_buffers()) {
-        spilled_partitions.push_back(spilled_partition);
-      }
-    }
+  // Initialize the probe partitions, providing them with probe streams.
+  vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams();
+  probe_hash_partitions_.resize(PARTITION_FANOUT);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
 
-    DCHECK(partition->probe_rows()->has_write_block());
-    DCHECK(!partition->probe_rows()->using_small_buffers());
+    DCHECK(!probe_streams.empty()) << "Builder should have created enough streams";
+    CreateProbePartition(i, std::move(probe_streams.back()));
+    probe_streams.pop_back();
   }
+  DCHECK(probe_streams.empty()) << "Builder should not have created extra streams";
 
-  // At this point, the partition is in one of these states:
-  // 1. closed. All done, no buffers in either the build or probe stream.
-  // 2. in_mem. The build side is pinned and has a hash table built.
-  // 3. spilled. The build side is fully unpinned and the probe side has an io
-  //    sized buffer.
-  for (Partition* partition: hash_partitions_) {
-    if (partition->hash_tbl() != NULL) partition->probe_rows()->Close();
+  // Initialize the hash_tbl_ caching array.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    hash_tbls_[i] = builder_->hash_partition(i)->hash_tbl();
   }
 
-  // TODO: at this point we could have freed enough memory to pin and build some
-  // spilled partitions. This can happen, for example is there is a lot of skew.
-  // Partition 1: 10GB (pinned initially).
-  // Partition 2,3,4: 1GB (spilled during partitioning the build).
-  // In the previous step, we could have unpinned 10GB (because there was not enough
-  // memory to build a hash table over it) which can now free enough memory to
-  // build hash tables over the remaining 3 partitions.
-  // We start by spilling the largest partition though so the build input would have
-  // to be pretty pathological.
-  // Investigate if this is worthwhile.
-
-  // Initialize the hash_tbl_ caching array.
+  // Validate the state of the partitions.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    if (build_partition->IsClosed()) {
+      DCHECK(hash_tbls_[i] == NULL);
+      DCHECK(probe_partition == NULL);
+    } else if (build_partition->is_spilled()) {
+      DCHECK(hash_tbls_[i] == NULL);
+      DCHECK(probe_partition != NULL);
+    } else {
+      DCHECK(hash_tbls_[i] != NULL);
+      DCHECK(probe_partition == NULL);
+    }
   }
   return Status::OK();
 }
 
+void PartitionedHashJoinNode::CreateProbePartition(
+    int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
+  DCHECK_GE(partition_idx, 0);
+  DCHECK_LT(partition_idx, probe_hash_partitions_.size());
+  DCHECK(probe_hash_partitions_[partition_idx] == NULL);
+  probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_,
+      this, builder_->hash_partition(partition_idx), std::move(probe_rows));
+}
+
 Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
   if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
@@ -1411,48 +946,59 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
   // add them to the list of partitions that need to output any unmatched build rows.
   // This partition will be closed by the function that actually outputs unmatched build
   // rows.
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition->is_closed()) continue;
-    if (partition->is_spilled()) {
-      DCHECK(partition->hash_tbl() == NULL) << NodeDebugString();
-      // Unpin the build and probe stream to free up more memory. We need to free all
-      // memory so we can recurse the algorithm and create new hash partitions from
-      // spilled partitions.
-      RETURN_IF_ERROR(partition->build_rows()->UnpinStream(true));
-      RETURN_IF_ERROR(partition->probe_rows()->UnpinStream(true));
-
-      // Push new created partitions at the front. This means a depth first walk
+  DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+  DCHECK_EQ(probe_hash_partitions_.size(), PARTITION_FANOUT);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    if (build_partition->IsClosed()) {
+      DCHECK(probe_partition == NULL);
+      continue;
+    }
+    if (build_partition->is_spilled()) {
+      DCHECK(probe_partition != NULL);
+      DCHECK(build_partition->hash_tbl() == NULL) << NodeDebugString();
+      // Unpin the probe stream to free up more memory. We need to free all memory so we
+      // can recurse the algorithm and create new hash partitions from spilled partitions.
+      // TODO: we shouldn't need to unpin the build stream if we stop spilling
+      // while probing.
+      RETURN_IF_ERROR(
+          build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0);
+      RETURN_IF_ERROR(
+          probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+
+      // Push newly created partitions at the front. This means a depth first walk
       // (more finely partitioned partitions are processed first). This allows us
       // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(partition);
+      spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
     } else {
-      DCHECK_EQ(partition->probe_rows()->num_rows(), 0)
-        << "No probe rows should have been spilled for this partition.";
-      if (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-          join_op_ == TJoinOp::FULL_OUTER_JOIN) {
+      DCHECK(probe_partition == NULL);
+      if (NeedToProcessUnmatchedBuildRows()) {
         if (output_build_partitions_.empty()) {
-          hash_tbl_iterator_ = partition->hash_tbl_->FirstUnmatched(ht_ctx_.get());
+          hash_tbl_iterator_ = build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
         }
-        output_build_partitions_.push_back(partition);
+        output_build_partitions_.push_back(build_partition);
       } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
         // For NAAJ, we need to try to match all the NULL probe rows with this partition
         // before closing it. The NULL probe rows could have come from any partition
         // so we collect them all and match them at the end.
-        RETURN_IF_ERROR(EvaluateNullProbe(partition->build_rows()));
-        partition->Close(batch);
+        RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows()));
+        build_partition->Close(batch);
       } else {
-        partition->Close(batch);
+        build_partition->Close(batch);
       }
     }
   }
 
   // Just finished evaluating the null probe rows with all the non-spilled build
   // partitions. Unpin this now to free this memory for repartitioning.
-  if (null_probe_rows_ != NULL) RETURN_IF_ERROR(null_probe_rows_->UnpinStream());
+  if (null_probe_rows_ != NULL)
+    RETURN_IF_ERROR(
+        null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
-  hash_partitions_.clear();
-  input_partition_ = NULL;
+  builder_->ClearHashPartitions();
+  probe_hash_partitions_.clear();
   return Status::OK();
 }
 
@@ -1465,17 +1011,30 @@ void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) co
   *out << ")";
 }
 
-void PartitionedHashJoinNode::UpdateState(HashJoinState s) {
-  state_ = s;
+void PartitionedHashJoinNode::UpdateState(HashJoinState next_state) {
+  // Validate the state transition.
+  switch (state_) {
+    case PARTITIONING_BUILD: DCHECK_EQ(next_state, PARTITIONING_PROBE); break;
+    case PARTITIONING_PROBE:
+    case REPARTITIONING_PROBE:
+    case PROBING_SPILLED_PARTITION:
+      DCHECK(
+          next_state == REPARTITIONING_BUILD || next_state == PROBING_SPILLED_PARTITION);
+      break;
+    case REPARTITIONING_BUILD: DCHECK_EQ(next_state, REPARTITIONING_PROBE); break;
+    default: DCHECK(false) << "Invalid state " << state_;
+  }
+  state_ = next_state;
   VLOG(2) << "Transitioned State:" << endl << NodeDebugString();
 }
 
 string PartitionedHashJoinNode::PrintState() const {
   switch (state_) {
     case PARTITIONING_BUILD: return "PartitioningBuild";
-    case PROCESSING_PROBE: return "ProcessingProbe";
-    case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartitions";
-    case REPARTITIONING: return "Repartitioning";
+    case PARTITIONING_PROBE: return "PartitioningProbe";
+    case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartition";
+    case REPARTITIONING_BUILD: return "RepartitioningBuild";
+    case REPARTITIONING_PROBE: return "RepartitioningProbe";
     default: DCHECK(false);
   }
   return "";
@@ -1485,58 +1044,41 @@ string PartitionedHashJoinNode::NodeDebugString() const {
   stringstream ss;
   ss << "PartitionedHashJoinNode (id=" << id() << " op=" << join_op_
      << " state=" << PrintState()
-     << " #partitions=" << hash_partitions_.size()
-     << " #spilled_partitions=" << spilled_partitions_.size()
-     << ")" << endl;
-
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    ss << i << ": ptr=" << partition;
-    DCHECK(partition != NULL);
-    if (partition->is_closed()) {
-      ss << " Closed" << endl;
-      continue;
-    }
-    if (partition->is_spilled()) {
-      ss << " Spilled" << endl;
-    }
-    DCHECK(partition->build_rows() != NULL);
-    DCHECK(partition->probe_rows() != NULL);
-    ss << endl
-       << "   Build Rows: " << partition->build_rows()->num_rows()
-       << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")"
-       << endl;
-    ss << "   Probe Rows: " << partition->probe_rows()->num_rows()
-       << " (Blocks pinned: " << partition->probe_rows()->blocks_pinned() << ")"
+     << " #hash_partitions=" << builder_->num_hash_partitions()
+     << " #spilled_partitions=" << spilled_partitions_.size() << ")" << endl;
+
+  ss << "PhjBuilder: " << builder_->DebugString();
+
+  ss << "Probe hash partitions: " << probe_hash_partitions_.size() << ":" << endl;
+  for (int i = 0; i < probe_hash_partitions_.size(); ++i) {
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    ss << "  Probe hash partition " << i << ": probe ptr=" << probe_partition
+       << "    Probe Rows: " << probe_partition->probe_rows()->num_rows()
+       << "    (Blocks pinned: " << probe_partition->probe_rows()->blocks_pinned() << ")"
        << endl;
-    if (partition->hash_tbl() != NULL) {
-      ss << "   Hash Table Rows: " << partition->hash_tbl()->size() << endl;
-    }
   }
 
   if (!spilled_partitions_.empty()) {
     ss << "SpilledPartitions" << endl;
-    for (list<Partition*>::const_iterator it = spilled_partitions_.begin();
-        it != spilled_partitions_.end(); ++it) {
-      DCHECK((*it)->is_spilled());
-      DCHECK((*it)->hash_tbl() == NULL);
-      DCHECK((*it)->build_rows() != NULL);
-      DCHECK((*it)->probe_rows() != NULL);
-      ss << "  Partition=" << *it << endl
-         << "   Spilled Build Rows: "
-         << (*it)->build_rows()->num_rows() << endl
-         << "   Spilled Probe Rows: "
-         << (*it)->probe_rows()->num_rows() << endl;
+    for (const unique_ptr<ProbePartition>& probe_partition : spilled_partitions_) {
+      PhjBuilder::Partition* build_partition = probe_partition->build_partition();
+      DCHECK(build_partition->is_spilled());
+      DCHECK(build_partition->hash_tbl() == NULL);
+      DCHECK(build_partition->build_rows() != NULL);
+      DCHECK(probe_partition->probe_rows() != NULL);
+      ss << "  Partition=" << probe_partition.get() << endl
+         << "   Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl
+         << "   Spilled Probe Rows: " << probe_partition->probe_rows()->num_rows()
+         << endl;
     }
   }
   if (input_partition_ != NULL) {
-    DCHECK(input_partition_->build_rows() != NULL);
+    DCHECK(input_partition_->build_partition()->build_rows() != NULL);
     DCHECK(input_partition_->probe_rows() != NULL);
-    ss << "InputPartition: " << input_partition_ << endl
+    ss << "InputPartition: " << input_partition_.get() << endl
        << "   Spilled Build Rows: "
-       << input_partition_->build_rows()->num_rows() << endl
-       << "   Spilled Probe Rows: "
-       << input_partition_->probe_rows()->num_rows() << endl;
+       << input_partition_->build_partition()->build_rows()->num_rows() << endl
+       << "   Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
   } else {
     ss << "InputPartition: NULL" << endl;
   }
@@ -1648,81 +1190,14 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen,
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
-  Function* process_build_batch_fn =
-      codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
-  DCHECK(process_build_batch_fn != NULL);
-
-  // Replace call sites
-  int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
-      "EvalBuildRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Replace some hash table parameters with constants.
-  HashTableCtx::HashTableReplacedConstants replaced_constants;
-  const bool stores_duplicates = true;
-  const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
-      num_build_tuples, process_build_batch_fn, &replaced_constants));
-  DCHECK_GE(replaced_constants.stores_nulls, 1);
-  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
-  DCHECK_EQ(replaced_constants.stores_duplicates, 0);
-  DCHECK_EQ(replaced_constants.stores_tuples, 0);
-  DCHECK_EQ(replaced_constants.quadratic_probing, 0);
-
-  Function* process_build_batch_fn_level0 =
-      codegen->CloneFunction(process_build_batch_fn);
-
-  // Always build runtime filters at level0 (if there are any).
-  // Note that the first argument of this function is the return value.
-  Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
-  build_filters_l0_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
-
-  // process_build_batch_fn_level0 uses CRC hash if available,
-  replaced = codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  // process_build_batch_fn uses murmur
-  replaced = codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Never build filters after repartitioning, as all rows have already been added to the
-  // filters during the level0 build. Note that the first argument of this function is the
-  // return value.
-  Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
-  build_filters_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
-
-  // Finalize ProcessBuildBatch functions
-  process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
-  if (process_build_batch_fn == NULL) {
-    return Status("Codegen'd PartitionedHashJoinNode::ProcessBuildBatch() function "
-        "failed verification, see log");
-  }
-  process_build_batch_fn_level0 =
-      codegen->FinalizeFunction(process_build_batch_fn_level0);
-  if (process_build_batch_fn == NULL) {
-    return Status("Codegen'd level-zero PartitionedHashJoinNode::ProcessBuildBatch() "
-        "function failed verification, see log");
-  }
-
-  // Register native function pointers
-  codegen->AddFunctionToJit(process_build_batch_fn,
-                            reinterpret_cast<void**>(&process_build_batch_fn_));
-  codegen->AddFunctionToJit(process_build_batch_fn_level0,
-                            reinterpret_cast<void**>(&process_build_batch_fn_level0_));
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
-    RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
+Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state->GetCodegen(&codegen));
+  // Codegen for hashing rows
+  Function* hash_fn;
+  Function* murmur_hash_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, false, &hash_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
 
   // Get cross compiled function
   IRFunction::Type ir_fn = IRFunction::FN_END;
@@ -1881,65 +1356,3 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
                             reinterpret_cast<void**>(&process_probe_batch_fn_level0_));
   return Status::OK();
 }
-
-Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
-  Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
-  Function* build_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
-
-  // Replace the parameter 'prefetch_mode' with constant.
-  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
-  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
-  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
-  prefetch_mode_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
-
-  // Use codegen'd EvalBuildRow() function
-  int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Use codegen'd Equals() function
-  replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
-  DCHECK_EQ(replaced, 1);
-
-  // Replace hash-table parameters with constants.
-  HashTableCtx::HashTableReplacedConstants replaced_constants;
-  const bool stores_duplicates = true;
-  const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
-      num_build_tuples, insert_batch_fn, &replaced_constants));
-  DCHECK_GE(replaced_constants.stores_nulls, 1);
-  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
-  DCHECK_GE(replaced_constants.stores_duplicates, 1);
-  DCHECK_GE(replaced_constants.stores_tuples, 1);
-  DCHECK_GE(replaced_constants.quadratic_probing, 1);
-
-  Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
-
-  // Use codegen'd hash functions
-  replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-  replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
-  if (insert_batch_fn == NULL) {
-    return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
-        "InsertBatch() function failed verification, see log");
-  }
-  insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
-  if (insert_batch_fn_level0 == NULL) {
-    return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
-        "InsertBatch() function failed verification, see log");
-  }
-
-  codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
-  codegen->AddFunctionToJit(insert_batch_fn_level0,
-      reinterpret_cast<void**>(&insert_batch_fn_level0_));
-  return Status::OK();
-}