You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:33 UTC

[1/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

Repository: incubator-impala
Updated Branches:
  refs/heads/master b7d107a69 -> 241c7e019


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 36dfce6..9827788 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -20,46 +20,88 @@
 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
 
 #include <boost/scoped_ptr.hpp>
-#include <boost/unordered_set.hpp>
 #include <boost/thread.hpp>
+#include <list>
+#include <memory>
 #include <string>
 
 #include "exec/blocking-join-node.h"
 #include "exec/exec-node.h"
-#include "exec/filter-context.h"
-#include "exec/hash-table.h"
+#include "exec/partitioned-hash-join-builder.h"
 #include "runtime/buffered-block-mgr.h"
 
-#include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
+#include "gen-cpp/Types_types.h"
 
 namespace impala {
 
 class BloomFilter;
 class BufferedBlockMgr;
+class BufferedTupleStream;
 class MemPool;
 class RowBatch;
 class RuntimeFilter;
 class TupleRow;
-class BufferedTupleStream;
 
-/// Operator to perform partitioned hash join, spilling to disk as necessary.
-/// A spilled partition is one that is not fully pinned.
-/// The operator runs in these distinct phases:
-///  1. Consume all build input and partition them. No hash tables are maintained.
-///  2. Construct hash tables from as many partitions as possible.
-///  3. Consume all the probe rows. Rows belonging to partitions that are spilled
-///     must be spilled as well.
-///  4. Iterate over the spilled partitions, construct the hash table from the spilled
-///     build rows and process the spilled probe rows. If the partition is still too
-///     big, repeat steps 1-4, using this spilled partitions build and probe rows as
-///     input.
-//
+/// Operator to perform partitioned hash join, spilling to disk as necessary. This
+/// operator implements multiple join modes with the same code algorithm.
+///
+/// The high-level algorithm is as follows:
+///  1. Consume all build input and partition it. No hash tables are maintained.
+///  2. Construct hash tables for as many unspilled partitions as possible.
+///  3. Consume the probe input. Each probe row is hashed to find the corresponding build
+///     partition. If the build partition is in-memory (i.e. not spilled), then the
+///     partition's hash table is probed and any matching rows can be outputted. If the
+///     build partition is spilled, the probe row must also be spilled for later
+///     processing.
+///  4. Any spilled partitions are processed. If the build rows and hash table for a
+///     spilled partition fit in memory, the spilled partition is brought into memory
+///     and its spilled probe rows are processed. Otherwise the spilled partition must be
+///     repartitioned into smaller partitions. Repartitioning repeats steps 1-3 above,
+///     except with the partition's spilled build and probe rows as input.
+///
+/// IMPLEMENTATION DETAILS:
+/// -----------------------
+/// The partitioned hash join algorithm is implemented with the PartitionedHashJoinNode
+/// and PhjBuilder classes. Each join node has a builder (see PhjBuilder) that
+/// partitions, stores and builds hash tables over the build rows.
+///
+/// The above algorithm is implemented as a state machine with the following phases:
+///
+///   1. [PARTITIONING_BUILD or REPARTITIONING_BUILD] Read build rows from child(1) OR
+///      from the spilled build rows of a partition and partition them into the builder's
+///      hash partitions. If there is sufficient memory, all build partitions are kept
+///      in memory. Otherwise, build partitions are spilled as needed to free up memory.
+///      Finally, build a hash table for each in-memory partition and create a probe
+///      partition with a write buffer for each spilled partition.
+///
+///      After the phase, the algorithm advances from PARTITIONING_BUILD to
+///      PARTITIONING_PROBE or from REPARTITIONING_BUILD to REPARTITIONING_PROBE.
+///
+///   2. [PARTITIONING_PROBE or REPARTITIONING_PROBE] Read the probe rows from child(0) or
+///      a the spilled probe rows of a partition and partition them. If a probe row's
+///      partition is in memory, probe the partition's hash table, otherwise spill the
+///      probe row. Finally, output unmatched build rows for join modes that require it.
+///
+///      After the phase, the algorithm terminates if no spilled partitions remain or
+///      continues to process one of the remaining spilled partitions by advancing to
+///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether
+///      the spilled partition's hash table fits in memory or not.
+///
+///   3. [PROBING_SPILLED_PARTITION] Read the probe rows from a spilled partition that
+///      was brought back into memory and probe the partition's hash table. Finally,
+///      output unmatched build rows for join modes that require it.
+///
+///      After the phase, the algorithm terminates if no spilled partitions remain or
+///      continues to process one of the remaining spilled partitions by advancing to
+///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether
+///      the spilled partition's hash table fits in memory or not.
+///
+/// Null aware anti-join (NAAJ) extends the above algorithm by accumulating rows with
+/// NULLs into several different streams, which are processed in a separate step to
+/// produce additional output rows. The NAAJ algorithm is documented in more detail in
+/// header comments for the null aware functions and data structures.
+///
 /// TODO: don't copy tuple rows so often.
-/// TODO: we need multiple hash functions. Each repartition needs new hash functions
-/// or new bits. Multiplicative hashing?
-/// TODO: think about details about multithreading. Multiple partitions in parallel?
-/// Multiple threads against a single partition? How to build hash tables in parallel?
-/// TODO: BuildHashTables() should start with the partitions that are already pinned.
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
   PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -78,102 +120,53 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual Status ProcessBuildInput(RuntimeState* state);
 
  private:
-  class Partition;
-
-  /// Implementation details:
-  /// Logically, the algorithm runs in three modes.
-  ///   1. [PARTITIONING_BUILD or REPARTITIONING] Read the build side rows and partition
-  ///      them into hash_partitions_. This is a fixed fan out of the input. The input
-  ///      can either come from child(1) OR from the build tuple stream of partition
-  ///      that needs to be repartitioned.
-  ///   2. [PROCESSING_PROBE or REPARTITIONING] Read the probe side rows, partition them
-  ///      and either perform the join or spill them into hash_partitions_. If the
-  ///      partition has the hash table in memory, we perform the join, otherwise we
-  ///      spill the probe row. Similar to step one, the rows can come from child(0) or
-  ///      a spilled partition.
-  ///   3. [PROBING_SPILLED_PARTITION] Read and construct a single spilled partition.
-  ///      In this case we are walking a spilled partition and the hash table fits in
-  ///      memory. Neither the build nor probe side need to be partitioned and we just
-  ///      perform the join.
-  ///
-  /// States:
-  /// The transition goes from PARTITIONING_BUILD -> PROCESSING_PROBE ->
-  ///    PROBING_SPILLED_PARTITION/REPARTITIONING.
-  /// The last two steps will switch back and forth as many times as we need to
-  /// repartition.
+  class ProbePartition;
+
   enum HashJoinState {
-    /// Partitioning the build (right) child's input. Corresponds to mode 1 above but
-    /// only when consuming from child(1).
+    /// Partitioning the build (right) child's input into the builder's hash partitions.
     PARTITIONING_BUILD,
 
-    /// Processing the probe (left) child's input. Corresponds to mode 2 above but
-    /// only when consuming from child(0).
-    PROCESSING_PROBE,
+    /// Processing the probe (left) child's input, probing hash tables and
+    /// spilling probe rows into 'probe_hash_partitions_' if necessary.
+    PARTITIONING_PROBE,
 
-    /// Probing a spilled partition. The hash table for this partition fits in memory.
-    /// Corresponds to mode 3.
+    /// Processing the spilled probe rows of a single spilled partition
+    /// ('input_partition_') that fits in memory.
     PROBING_SPILLED_PARTITION,
 
-    /// Repartitioning a single spilled partition (input_partition_) into
-    /// hash_partitions_.
-    /// Corresponds to mode 1 & 2 but reading from a spilled partition.
-    REPARTITIONING,
+    /// Repartitioning the build rows of a single spilled partition ('input_partition_')
+    /// into the builder's hash partitions.
+    /// Corresponds to PARTITIONING_BUILD but reading from a spilled partition.
+    REPARTITIONING_BUILD,
+
+    /// Probing the repartitioned hash partitions of a single spilled partition
+    /// ('input_partition_') with the probe rows of that partition.
+    /// Corresponds to PARTITIONING_PROBE but reading from a spilled partition.
+    REPARTITIONING_PROBE,
   };
 
-  /// Number of initial partitions to create. Must be a power of two.
-  /// TODO: this is set to a lower than actual value for testing.
-  static const int PARTITION_FANOUT = 16;
-
-  /// Needs to be the log(PARTITION_FANOUT)
-  static const int NUM_PARTITIONING_BITS = 4;
-
-  /// Maximum number of times we will repartition. The maximum build table we
-  /// can process is:
-  /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB
-  /// limit and 64 fanout, we can support 256TB build tables in the case where
-  /// there is no skew.
-  /// In the case where there is skew, repartitioning is unlikely to help (assuming a
-  /// reasonable hash function).
-  /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
-  /// TODO: we can revisit and try harder to explicitly detect skew.
-  static const int MAX_PARTITION_DEPTH = 16;
-
-  /// Append the row to stream. In the common case, the row is just in memory and the
-  /// append succeeds. If the append fails, we fallback to the slower path of
-  /// AppendRowStreamFull().
-  /// Returns true if the row was added and false otherwise. If false is returned,
-  /// *status contains the error (doesn't return status because this is very perf
-  /// sensitive).
-  bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
-
-  /// Slow path for AppendRow() above. It is called when the stream has failed to append
-  /// the row. We need to find more memory by either switching to IO-buffers, in case the
-  /// stream still uses small buffers, or spilling a partition.
-  bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, Status* status);
-
-  /// Called when we need to free up memory by spilling a partition.
-  /// This function walks hash_partitions_ and picks one to spill.
-  /// *spilled_partition is the partition that was spilled.
-  /// Returns non-ok status if we couldn't spill a partition.
-  Status SpillPartition(Partition** spilled_partition);
-
-  /// Partitions the entire build input (either from child(1) or input_partition_) into
-  /// hash_partitions_. When this call returns, hash_partitions_ is ready to consume
-  /// the probe input.
-  /// 'level' is the level new partitions (in hash_partitions_) should be created with.
-  Status ProcessBuildInput(RuntimeState* state, int level);
-
-  /// Reads the rows in build_batch and partitions them in hash_partitions_. If
-  /// 'build_filters' is true, runtime filters are populated.
-  Status ProcessBuildBatch(RowBatch* build_batch, bool build_filters);
-
-  /// Call at the end of partitioning the build rows (which could be from the build child
-  /// or from repartitioning an existing partition). After this function returns, all
-  /// partitions in hash_partitions_ are ready to accept probe rows. This function
-  /// constructs hash tables for as many partitions as fit in memory (which can be none).
-  /// For the remaining partitions, this function initializes the probe spilling
-  /// structures.
-  Status BuildHashTables(RuntimeState* state);
+  /// Constants from PhjBuilder, added to this node for convenience.
+  static const int PARTITION_FANOUT = PhjBuilder::PARTITION_FANOUT;
+  static const int NUM_PARTITIONING_BITS = PhjBuilder::NUM_PARTITIONING_BITS;
+  static const int MAX_PARTITION_DEPTH = PhjBuilder::MAX_PARTITION_DEPTH;
+
+  /// Initialize 'probe_hash_partitions_' and 'hash_tbls_' before probing. One probe
+  /// partition is created per spilled build partition, and 'hash_tbls_' is initialized
+  /// with pointers to the hash tables of in-memory partitions and NULL pointers for
+  /// spilled or closed partitions.
+  /// Called after the builder has partitioned the build rows and built hash tables,
+  /// either in the initial build step, or after repartitioning a spilled partition.
+  /// After this function returns, all partitions are ready to process probe rows.
+  Status PrepareForProbe();
+
+  /// Creates an initialized probe partition at 'partition_idx' in
+  /// 'probe_hash_partitions_'.
+  void CreateProbePartition(
+      int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
+
+  /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
+  /// a write buffer allocated, so this will succeed unless an error is encountered.
+  Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row);
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns true
@@ -292,6 +285,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// new partition at the front of output_build_partitions_.
   void OutputUnmatchedBuild(RowBatch* out_batch);
 
+  /// Initializes 'null_aware_probe_partition_' and prepares its probe stream for writing.
+  Status InitNullAwareProbePartition();
+
+  /// Initializes 'null_probe_rows_' and prepares that stream for writing.
+  Status InitNullProbeRows();
+
   /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
   Status PrepareNullAwarePartition();
 
@@ -313,155 +312,75 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// false. Used for NAAJ.
   Status OutputNullAwareNullProbe(RuntimeState* state, RowBatch* out_batch);
 
-  /// Call at the end of consuming the probe rows. Walks hash_partitions_ and
-  ///  - If this partition had a hash table, close it. This partition is fully processed
-  ///    on both the build and probe sides. The streams are transferred to batch.
+  /// Call at the end of consuming the probe rows. Cleans up the build and probe hash
+  /// partitions and:
+  ///  - If the build partition had a hash table, close it. The build and probe
+  ///    partitions are fully processed. The streams are transferred to 'batch'.
   ///    In the case of right-outer and full-outer joins, instead of closing this
   ///    partition we put it on a list of partitions for which we need to flush their
   ///    unmatched rows.
-  ///  - If this partition did not have a hash table, meaning both sides were spilled,
-  ///    move the partition to spilled_partitions_.
+  ///  - If the build partition did not have a hash table, meaning both build and probe
+  ///    rows were spilled, move the partition to 'spilled_partitions_'.
   Status CleanUpHashPartitions(RowBatch* batch);
 
   /// Get the next row batch from the probe (left) side (child(0)). If we are done
-  /// consuming the input, sets probe_batch_pos_ to -1, otherwise, sets it to 0.
-  Status NextProbeRowBatch(RuntimeState*, RowBatch* out_batch);
+  /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
+  Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
 
-  /// Get the next probe row batch from input_partition_. If we are done consuming the
-  /// input, sets probe_batch_pos_ to -1, otherwise, sets it to 0.
-  Status NextSpilledProbeRowBatch(RuntimeState*, RowBatch* out_batch);
+  /// Get the next probe row batch from 'input_partition_'. If we are done consuming the
+  /// input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
+  Status NextSpilledProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
 
-  /// Moves onto the next spilled partition and initializes input_partition_. This
-  /// function processes the entire build side of input_partition_ and when this function
-  /// returns, we are ready to consume the probe side of input_partition_.
+  /// Moves onto the next spilled partition and initializes 'input_partition_'. This
+  /// function processes the entire build side of 'input_partition_' and when this
+  /// function returns, we are ready to consume the probe side of 'input_partition_'.
   /// If the build side's hash table fits in memory, we will construct input_partition_'s
   /// hash table. If it does not, meaning we need to repartition, this function will
-  /// initialize hash_partitions_.
-  Status PrepareNextPartition(RuntimeState*);
-
-  /// Iterates over all the partitions in hash_partitions_ and returns the number of rows
-  /// of the largest partition (in terms of number of build and probe rows).
-  int64_t LargestSpilledPartition() const;
+  /// repartition the build rows into 'builder->hash_partitions_' and prepare for
+  /// repartitioning the partition's probe rows.
+  Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* got_partition);
 
-  /// Calls Close() on every Partition in 'hash_partitions_',
-  /// 'spilled_partitions_', and 'output_build_partitions_' and then resets the lists,
-  /// the vector and the partition pool.
-  void ClosePartitions();
+  /// Calls Close() on every probe partition, destroys the partitions and cleans up any
+  /// references to the partitions. Also closes and destroys 'null_probe_rows_'.
+  void CloseAndDeletePartitions();
 
   /// Prepares for probing the next batch.
   void ResetForProbe();
 
-  /// For each filter in filters_, allocate a bloom_filter from the fragment-local
-  /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
-  /// phase.
-  void AllocateRuntimeFilters(RuntimeState* state);
-
-  /// Publish the runtime filters to the fragment-local
-  /// RuntimeFilterBank. 'total_build_rows' is used to determine whether the computed
-  /// filters have an unacceptably high false-positive rate.
-  void PublishRuntimeFilters(RuntimeState* state, int64_t total_build_rows);
-
   /// Codegen function to create output row. Assumes that the probe row is non-NULL.
   Status CodegenCreateOutputRow(LlvmCodeGen* codegen, llvm::Function** fn);
 
-  /// Codegen processing build batches.  Identical signature to ProcessBuildBatch.
-  /// Returns non-OK status if codegen was not possible.
-  Status CodegenProcessBuildBatch(RuntimeState* state, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
-
   /// Codegen processing probe batches.  Identical signature to ProcessProbeBatch.
   /// Returns non-OK if codegen was not possible.
-  Status CodegenProcessProbeBatch(
-      RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn);
-
-  /// Codegen inserting batches into a partition's hash table. Identical signature to
-  /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
-  Status CodegenInsertBatch(RuntimeState* state, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
+  Status CodegenProcessProbeBatch(RuntimeState* state);
 
   /// Returns the current state of the partition as a string.
   std::string PrintState() const;
 
-  /// Updates state_ to 's', logging the transition.
-  void UpdateState(HashJoinState s);
+  /// Updates 'state_' to 'next_state', logging the transition.
+  void UpdateState(HashJoinState next_state);
 
   std::string NodeDebugString() const;
 
-  /// We need two output buffers per partition (one for build and one for probe) and
-  /// and two additional buffers for the input (while repartitioning; for the build and
-  /// probe sides).
-  /// For NAAJ, we need 3 additional buffers to maintain the null_aware_partition_.
-  int MinRequiredBuffers() const {
-    int num_reserved_buffers = PARTITION_FANOUT * 2 + 2;
-    num_reserved_buffers += join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0;
-    return num_reserved_buffers;
-  }
-
   RuntimeState* runtime_state_;
 
   /// Our equi-join predicates "<lhs> = <rhs>" are separated into
   /// build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0))
-  std::vector<ExprContext*> probe_expr_ctxs_;
   std::vector<ExprContext*> build_expr_ctxs_;
-
-  /// List of filters to build during build phase.
-  std::vector<FilterContext> filters_;
-
-  /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
-  /// NOT DISTINCT FROM, rather than equality.
-  std::vector<bool> is_not_distinct_from_;
+  std::vector<ExprContext*> probe_expr_ctxs_;
 
   /// Non-equi-join conjuncts from the ON clause.
   std::vector<ExprContext*> other_join_conjunct_ctxs_;
 
-  /// Codegen doesn't allow for automatic Status variables because then exception
-  /// handling code is needed to destruct the Status, and our function call substitution
-  /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
-  /// placing the build-side status here so exceptions won't need to destruct it.
-  /// This status should used directly only by ProcesssBuildBatch().
-  /// TODO: fix IMPALA-1948 and remove this.
-  Status build_status_;
-
-  /// Client to the buffered block mgr.
-  BufferedBlockMgr::Client* block_mgr_client_;
-
   /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
-  /// TODO: If we want to multi-thread then this context should be thread-local and not
-  /// associated with the node.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
   /// The iterator that corresponds to the look up of current_probe_row_.
   HashTable::Iterator hash_tbl_iterator_;
 
-  /// Total time spent partitioning build.
-  RuntimeProfile::Counter* partition_build_timer_;
-
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
-
-  /// Total number of partitions created.
-  RuntimeProfile::Counter* partitions_created_;
-
-  /// Level of max partition (i.e. number of repartitioning steps).
-  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
-
-  /// Number of build/probe rows that have been partitioned.
-  RuntimeProfile::Counter* num_build_rows_partitioned_;
+  /// Number of probe rows that have been partitioned.
   RuntimeProfile::Counter* num_probe_rows_partitioned_;
 
-  /// Number of partitions that have been repartitioned.
-  RuntimeProfile::Counter* num_repartitions_;
-
-  /// Number of partitions that have been spilled.
-  RuntimeProfile::Counter* num_spilled_partitions_;
-
-  /// The largest fraction (of build side) after repartitioning. This is expected to be
-  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
-  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
-
-  /// Number of hash collisions - unequal rows that have identical hash values
-  RuntimeProfile::Counter* num_hash_collisions_;
-
   /// Time spent evaluating other_join_conjuncts for NAAJ.
   RuntimeProfile::Counter* null_aware_eval_timer_;
 
@@ -471,60 +390,52 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// State of the partitioned hash join algorithm. Used just for debugging.
   HashJoinState state_;
 
-  /// Object pool that holds the Partition objects in hash_partitions_.
-  boost::scoped_ptr<ObjectPool> partition_pool_;
-
-  /// The current set of partitions that are being built. This is only used in
-  /// mode 1 and 2 when we need to partition the build and probe inputs.
-  /// This is not used when processing a single partition.
-  /// After CleanUpHashPartitions() is called this vector should be empty.
-  std::vector<Partition*> hash_partitions_;
-
-  /// The list of partitions that have been spilled on both sides and still need more
-  /// processing. These partitions could need repartitioning, in which case more
-  /// partitions will be added to this list after repartitioning.
-  /// This list is populated at CleanUpHashPartitions().
-  std::list<Partition*> spilled_partitions_;
+  /// The build-side of the join. Initialized in Init().
+  boost::scoped_ptr<PhjBuilder> builder_;
 
   /// Cache of the per partition hash table to speed up ProcessProbeBatch.
   /// In the case where we need to partition the probe:
-  ///  hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
+  ///  hash_tbls_[i] = builder_->hash_partitions_[i]->hash_tbl();
   /// In the case where we don't need to partition the probe:
   ///  hash_tbls_[i] = input_partition_->hash_tbl();
   HashTable* hash_tbls_[PARTITION_FANOUT];
 
-  /// The current input partition to be processed (not in spilled_partitions_).
-  /// This partition can either serve as the source for a repartitioning step, or
-  /// if the hash table fits in memory, the source of the probe rows.
-  Partition* input_partition_;
+  /// Probe partitions, with indices corresponding to the build partitions in
+  /// builder_->hash_partitions(). This is non-empty only in the PARTITIONING_PROBE or
+  /// REPARTITIONING_PROBE states, in which case it has NULL entries for in-memory
+  /// build partitions and non-NULL entries for spilled build partitions (so that we
+  /// have somewhere to spill the probe rows for the spilled partition).
+  std::vector<std::unique_ptr<ProbePartition>> probe_hash_partitions_;
+
+  /// The list of probe partitions that have been spilled and still need more
+  /// processing. These partitions could need repartitioning, in which case more
+  /// partitions will be added to this list after repartitioning.
+  /// This list is populated at CleanUpHashPartitions().
+  std::list<std::unique_ptr<ProbePartition>> spilled_partitions_;
+
+  /// The current spilled probe partition being processed as input to repartitioning,
+  /// or the source of the probe rows if the hash table fits in memory.
+  std::unique_ptr<ProbePartition> input_partition_;
 
   /// In the case of right-outer and full-outer joins, this is the list of the partitions
   /// for which we need to output their unmatched build rows.
   /// This list is populated at CleanUpHashPartitions().
-  std::list<Partition*> output_build_partitions_;
-
-  /// Partition used if null_aware_ is set. This partition is always processed at the end
-  /// after all build and probe rows are processed. Rows are added to this partition along
-  /// the way.
-  /// In this partition's build_rows_, we store all the rows for which build_expr_ctxs_
-  /// evaluated over the row returns NULL (i.e. it has a NULL on the eq join slot).
-  /// In this partition's probe_rows, we store all probe rows that did not have a match
-  /// in the hash table.
-  /// At the very end, we then iterate over all the probe rows. For each probe row, we
-  /// return the rows that did not match any of the build rows.
-  /// NULL if we this join is not null aware or we are done processing this partition.
-  Partition* null_aware_partition_;
+  std::list<PhjBuilder::Partition*> output_build_partitions_;
 
   /// Used while processing null_aware_partition_. It contains all the build tuple rows
   /// with a NULL when evaluating the hash table expr.
   boost::scoped_ptr<RowBatch> nulls_build_batch_;
 
-  /// If true, the build side has at least one row.
-  bool non_empty_build_;
+  /// Partition used if 'null_aware_' is set. During probing, rows from the probe
+  /// side that did not have a match in the hash table are appended to this partition.
+  /// At the very end, we then iterate over the partition's probe rows. For each probe
+  /// row, we return the rows that did not match any of the partition's build rows. This
+  /// is NULL if this join is not null aware or we are done processing this partition.
+  boost::scoped_ptr<ProbePartition> null_aware_probe_partition_;
 
   /// For NAAJ, this stream contains all probe rows that had NULL on the hash table
-  /// conjuncts.
-  BufferedTupleStream* null_probe_rows_;
+  /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches.
+  std::unique_ptr<BufferedTupleStream> null_probe_rows_;
 
   /// For each row in null_probe_rows_, true if this row has matched any build row
   /// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -538,99 +449,55 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
-  class Partition {
+  /// The probe-side partition corresponding to a build partition. The probe partition
+  /// is created when a build partition is spilled so that probe rows can be spilled to
+  /// disk for later processing.
+  class ProbePartition {
    public:
-    Partition(RuntimeState* state, PartitionedHashJoinNode* parent, int level);
-    ~Partition();
-
-    BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_; }
-    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_; }
-    HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
-
-    bool ALWAYS_INLINE is_closed() const { return is_closed_; }
-    bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
-
-    /// Must be called once per partition to release any resources. This should be called
-    /// as soon as possible to release memory.
-    /// If batch is non-null, the build and probe streams are attached to the batch,
-    /// transferring ownership to them.
+    /// Create a new probe partition. 'probe_rows' should be an empty unpinned stream
+    /// that has been prepared for writing with an I/O-sized write buffer.
+    ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
+        PhjBuilder::Partition* build_partition,
+        std::unique_ptr<BufferedTupleStream> probe_rows);
+    ~ProbePartition();
+
+    /// Prepare to read the probe rows. Allocates the first read block, so reads will
+    /// not fail with out of memory if this succeeds. Returns an error if the first read
+    /// block cannot be acquired. "delete_on_read" mode is used, so the blocks backing
+    /// the buffered tuple stream will be destroyed after reading.
+    Status PrepareForRead();
+
+    /// Close the partition and attach resources to 'batch' if non-NULL or free the
+    /// resources if 'batch' is NULL. Idempotent.
     void Close(RowBatch* batch);
 
-    /// Returns the estimated size of the in memory size for the build side of this
-    /// partition. This includes the entire build side and the hash table.
-    int64_t EstimatedInMemSize() const;
-
-    /// Pins the build tuples for this partition and constructs the hash_tbl_ from it.
-    /// Build rows cannot be added after calling this.
-    /// If the partition could not be built due to memory pressure, *built is set to false
-    /// and the caller is responsible for spilling this partition.
-    Status BuildHashTable(RuntimeState* state, bool* built);
+    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
+    PhjBuilder::Partition* build_partition() { return build_partition_; }
 
-    /// Spills this partition, cleaning up and unpinning blocks.
-    /// If 'unpin_all_build' is true, the build stream is completely unpinned, otherwise,
-    /// it is unpinned with one buffer remaining.
-    Status Spill(bool unpin_all_build);
+    inline bool IsClosed() const { return probe_rows_ == NULL; }
 
    private:
-    friend class PartitionedHashJoinNode;
-
-    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
-    /// containing the index of each row's index into the hash table's tuple stream.
-    /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
-    /// table buckets which the rows hashes to will be prefetched. This parameter is
-    /// replaced with a constant during codegen time. This function may be replaced with
-    /// a codegen'd version. Returns true if all rows in 'batch' are successfully
-    /// inserted.
-    bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
-        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
-
     PartitionedHashJoinNode* parent_;
 
-    /// This partition is completely processed and nothing needs to be done for it again.
-    /// All resources associated with this partition are returned.
-    bool is_closed_;
-
-    /// True if this partition is spilled.
-    bool is_spilled_;
-
-    /// How many times rows in this partition have been repartitioned. Partitions created
-    /// from the node's children's input is level 0, 1 after the first repartitionining,
-    /// etc.
-    int level_;
+    /// The corresponding build partition. Not NULL. Owned by PhjBuilder.
+    PhjBuilder::Partition* build_partition_;
 
-    /// The hash table for this partition.
-    boost::scoped_ptr<HashTable> hash_tbl_;
-
-    /// Stream of build/probe tuples in this partition. Allocated from the runtime state's
-    /// object pool. Initially owned by this object (meaning it has to call Close() on it)
-    /// but transferred to the parent exec node (via the row batch) when the partition
-    /// is complete.
-    /// If NULL, ownership has been transfered.
-    BufferedTupleStream* build_rows_;
-    BufferedTupleStream* probe_rows_;
+    /// Stream of probe tuples in this partition. Initially owned by this object but
+    /// transferred to the parent exec node (via the row batch) when the partition
+    /// is complete. If NULL, ownership was transferred and the partition is closed.
+    std::unique_ptr<BufferedTupleStream> probe_rows_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
   /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is
   /// used for subsequent levels.
 
-  typedef Status (*ProcessBuildBatchFn)(PartitionedHashJoinNode*, RowBatch*,
-      bool build_filters);
-  /// Jitted ProcessBuildBatch function pointers.  NULL if codegen is disabled.
-  ProcessBuildBatchFn process_build_batch_fn_;
-  ProcessBuildBatchFn process_build_batch_fn_level0_;
-
   typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
       TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
   /// Jitted ProcessProbeBatch function pointers.  NULL if codegen is disabled.
   ProcessProbeBatchFn process_probe_batch_fn_;
   ProcessProbeBatchFn process_probe_batch_fn_level0_;
 
-  typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*,
-      RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&);
-  /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
-  InsertBatchFn insert_batch_fn_;
-  InsertBatchFn insert_batch_fn_level0_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index ffeeb45..a53b40e 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -32,12 +32,6 @@ inline void PartitionedHashJoinNode::ResetForProbe() {
   ht_ctx_->expr_values_cache()->Reset();
 }
 
-inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream,
-    TupleRow* row, Status* status) {
-  if (LIKELY(stream->AddRow(row, status))) return true;
-  return AppendRowStreamFull(stream, row, status);
-}
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index b2c3973..bf7b595 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -15,15 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/runtime-state.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/mem-pool.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/runtime-profile-counters.h"
+#include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
 #include <openssl/rand.h>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 8d07584..20af23e 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -290,8 +290,13 @@ class SimpleTupleStreamTest : public testing::Test {
     BufferedTupleStream stream(runtime_state_, *desc, runtime_state_->block_mgr(),
         client_, use_small_buffers, false);
     ASSERT_OK(stream.Init(-1, NULL, true));
+    bool got_write_buffer;
+    ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+    ASSERT_TRUE(got_write_buffer);
 
-    if (unpin_stream) ASSERT_OK(stream.UnpinStream());
+    if (unpin_stream) {
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    }
     // Add rows to the stream
     int offset = 0;
     for (int i = 0; i < num_batches; ++i) {
@@ -339,10 +344,15 @@ class SimpleTupleStreamTest : public testing::Test {
           client_, small_buffers == 0,  // initial small buffers
           true); // read_write
       ASSERT_OK(stream.Init(-1, NULL, true));
+      bool got_write_buffer;
+      ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+      ASSERT_TRUE(got_write_buffer);
       bool got_read_buffer;
       ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
       ASSERT_TRUE(got_read_buffer);
-      if (unpin_stream) ASSERT_OK(stream.UnpinStream());
+      if (unpin_stream) {
+        ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+      }
 
       vector<int> results;
 
@@ -555,6 +565,9 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
   BufferedTupleStream stream(runtime_state_, *row_desc, runtime_state_->block_mgr(),
       client_, true, false);
   ASSERT_OK(stream.Init(-1, NULL, true));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   int offset = 0;
   bool full = false;
@@ -571,7 +584,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
     offset += j;
   }
 
-  ASSERT_OK(stream.UnpinStream());
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   bool pinned = false;
   ASSERT_OK(stream.PinStream(false, &pinned));
@@ -624,6 +637,9 @@ TEST_F(SimpleTupleStreamTest, SmallBuffers) {
   BufferedTupleStream stream(runtime_state_, *int_desc_, runtime_state_->block_mgr(),
       client_, true, false);
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   // Initial buffer should be small.
   EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
@@ -849,6 +865,9 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
   BufferedTupleStream stream(runtime_state_, *string_desc_, runtime_state_->block_mgr(),
       client_, false, false);
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   for (int i = 0; i < num_batches; ++i) {
     RowBatch* batch = CreateStringBatch(rows_added, 1, false);
@@ -1005,6 +1024,10 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
   int array_len_index = 0;
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
+
   for (int i = 0; i < NUM_ROWS; ++i) {
     int expected_row_size = tuple_descs[0]->byte_size() + tuple_descs[1]->byte_size();
     gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 978a221..b2fce45 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -120,9 +120,8 @@ BufferedTupleStream::~BufferedTupleStream() {
 // num_pinned_.
 int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) {
   int num_pinned = 0;
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks.begin();
-      it != blocks.end(); ++it) {
-    if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned;
+  for (BufferedBlockMgr::Block* block : blocks) {
+    if (block->is_pinned() && block->is_max_size()) ++num_pinned;
   }
   return num_pinned;
 }
@@ -141,10 +140,9 @@ string BufferedTupleStream::DebugString() const {
     ss << *read_block_;
   }
   ss << " blocks=[\n";
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    ss << "{" << (*it)->DebugString() << "}";
-    if (*it != blocks_.back()) ss << ",\n";
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    ss << "{" << block->DebugString() << "}";
+    if (block != blocks_.back()) ss << ",\n";
   }
   ss << "]";
   return ss.str();
@@ -169,15 +167,15 @@ Status BufferedTupleStream::Init(int node_id, RuntimeProfile* profile, bool pinn
   if (block_mgr_->max_block_size() < INITIAL_BLOCK_SIZES[0]) {
     use_small_buffers_ = false;
   }
-
-  bool got_block;
-  RETURN_IF_ERROR(NewWriteBlockForRow(fixed_tuple_row_size_, &got_block));
-  if (!got_block) return block_mgr_->MemLimitTooLowError(block_mgr_client_, node_id);
-  DCHECK(write_block_ != NULL);
-  if (!pinned) RETURN_IF_ERROR(UnpinStream());
+  if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
   return Status::OK();
 }
 
+Status BufferedTupleStream::PrepareForWrite(bool* got_buffer) {
+  DCHECK(write_block_ == NULL);
+  return NewWriteBlockForRow(fixed_tuple_row_size_, got_buffer);
+}
+
 Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) {
   if (!use_small_buffers_) {
     *got_buffer = (write_block_ != NULL);
@@ -194,9 +192,8 @@ Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) {
 }
 
 void BufferedTupleStream::Close() {
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    (*it)->Delete();
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    block->Delete();
   }
   blocks_.clear();
   num_pinned_ = 0;
@@ -206,12 +203,11 @@ void BufferedTupleStream::Close() {
 
 int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const {
   int64_t result = 0;
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if (!(*it)->is_pinned()) continue;
-    if (!(*it)->is_max_size()) continue;
-    if (*it == write_block_ && ignore_current) continue;
-    result += (*it)->buffer_len();
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (!block->is_pinned()) continue;
+    if (!block->is_max_size()) continue;
+    if (block == write_block_ && ignore_current) continue;
+    result += block->buffer_len();
   }
   return result;
 }
@@ -394,13 +390,12 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer
     write_block_ = NULL;
   }
 
-  // Walk the blocks and pin the first non-io sized block.
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if (!(*it)->is_pinned()) {
+  // Walk the blocks and pin the first IO-sized block.
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (!block->is_pinned()) {
       SCOPED_TIMER(pin_timer_);
       bool current_pinned;
-      RETURN_IF_ERROR((*it)->Pin(&current_pinned));
+      RETURN_IF_ERROR(block->Pin(&current_pinned));
       if (!current_pinned) {
         *got_buffer = false;
         return Status::OK();
@@ -408,7 +403,7 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer
       ++num_pinned_;
       DCHECK_EQ(num_pinned_, NumPinned(blocks_));
     }
-    if ((*it)->is_max_size()) break;
+    if (block->is_max_size()) break;
   }
 
   read_block_ = blocks_.begin();
@@ -437,12 +432,11 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) {
     }
   }
 
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if ((*it)->is_pinned()) continue;
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (block->is_pinned()) continue;
     {
       SCOPED_TIMER(pin_timer_);
-      RETURN_IF_ERROR((*it)->Pin(pinned));
+      RETURN_IF_ERROR(block->Pin(pinned));
     }
     if (!*pinned) {
       VLOG_QUERY << "Should have been reserved." << endl
@@ -457,9 +451,8 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) {
     // Populate block_start_idx_ on pin.
     DCHECK_EQ(block_start_idx_.size(), blocks_.size());
     block_start_idx_.clear();
-    for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-        it != blocks_.end(); ++it) {
-      block_start_idx_.push_back((*it)->buffer());
+    for (BufferedBlockMgr::Block* block : blocks_) {
+      block_start_idx_.push_back(block->buffer());
     }
   }
   *pinned = true;
@@ -467,18 +460,20 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) {
   return Status::OK();
 }
 
-Status BufferedTupleStream::UnpinStream(bool all) {
+Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
   DCHECK(!closed_);
+  DCHECK(mode == UNPIN_ALL || mode == UNPIN_ALL_EXCEPT_CURRENT);
   SCOPED_TIMER(unpin_timer_);
 
   for (BufferedBlockMgr::Block* block: blocks_) {
     if (!block->is_pinned()) continue;
-    if (!all && (block == write_block_ || (read_write_ && block == *read_block_))) {
+    if (mode == UNPIN_ALL_EXCEPT_CURRENT
+        && (block == write_block_ || (read_write_ && block == *read_block_))) {
       continue;
     }
     RETURN_IF_ERROR(UnpinBlock(block));
   }
-  if (all) {
+  if (mode == UNPIN_ALL) {
     read_block_ = blocks_.end();
     write_block_ = NULL;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index e14f16c..d3bfa81 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -228,6 +228,13 @@ class BufferedTupleStream {
   /// 'node_id' is only used for error reporting.
   Status Init(int node_id, RuntimeProfile* profile, bool pinned);
 
+  /// Prepares the stream for writing by attempting to allocate a write block.
+  /// Called after Init() and before the first AddRow() call.
+  /// 'got_buffer': set to true if the first write block was successfully pinned, or
+  ///     false if the block could not be pinned and no error was encountered. Undefined
+  ///     if an error status is returned.
+  Status PrepareForWrite(bool* got_buffer);
+
   /// Must be called for streams using small buffers to switch to IO-sized buffers.
   /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_
   /// back to false.
@@ -269,9 +276,20 @@ class BufferedTupleStream {
   /// block_mgr_client_ to pin the stream.
   Status PinStream(bool already_reserved, bool* pinned);
 
-  /// Unpins stream. If all is true, all blocks are unpinned, otherwise all blocks
-  /// except the write_block_ and read_block_ are unpinned.
-  Status UnpinStream(bool all = false);
+  /// Modes for UnpinStream().
+  enum UnpinMode {
+    /// All blocks in the stream are unpinned and the read/write positions in the stream
+    /// are reset. No more rows can be written to the stream after this. The stream can
+    /// be re-read from the beginning by calling PrepareForRead().
+    UNPIN_ALL,
+    /// All blocks are unpinned aside from the current read and write blocks (if any),
+    /// which is left in the same state. The unpinned stream can continue being read
+    /// or written from the current read or write positions.
+    UNPIN_ALL_EXCEPT_CURRENT,
+  };
+
+  /// Unpins stream with the given 'mode' as described above.
+  Status UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream
   /// and must be copied out by the caller.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 58ab501..b19ba33 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -374,25 +374,40 @@ void RuntimeProfile::ComputeTimeInProfile(int64_t total) {
 }
 
 void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
-  DCHECK(child != NULL);
   lock_guard<SpinLock> l(children_lock_);
-  if (child_map_.count(child->name_) > 0) {
-    // This child has already been added, so do nothing.
-    // Otherwise, the map and vector will be out of sync.
-    return;
-  }
-  child_map_[child->name_] = child;
+  ChildVector::iterator insert_pos;
   if (loc == NULL) {
-    children_.push_back(make_pair(child, indent));
+    insert_pos = children_.end();
   } else {
+    bool found = false;
     for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) {
       if (it->first == loc) {
-        children_.insert(++it, make_pair(child, indent));
-        return;
+        insert_pos = it + 1;
+        found = true;
+        break;
       }
     }
-    DCHECK(false) << "Invalid loc";
+    DCHECK(found) << "Invalid loc";
   }
+  AddChildLocked(child, indent, insert_pos);
+}
+
+void RuntimeProfile::AddChildLocked(
+    RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) {
+  children_lock_.DCheckLocked();
+  DCHECK(child != NULL);
+  if (child_map_.count(child->name_) > 0) {
+    // This child has already been added, so do nothing.
+    // Otherwise, the map and vector will be out of sync.
+    return;
+  }
+  child_map_[child->name_] = child;
+  children_.insert(insert_pos, make_pair(child, indent));
+}
+
+void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) {
+  lock_guard<SpinLock> l(children_lock_);
+  AddChildLocked(child, indent, children_.begin());
 }
 
 void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 77220d8..513f39d 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -124,6 +124,10 @@ class RuntimeProfile {
   void AddChild(RuntimeProfile* child,
       bool indent = true, RuntimeProfile* location = NULL);
 
+  /// Adds a child profile, similarly to AddChild(). The child profile is put before any
+  /// existing profiles.
+  void PrependChild(RuntimeProfile* child, bool indent = true);
+
   /// Sorts all children according to a custom comparator. Does not
   /// invalidate pointers to profiles.
   template <class Compare>
@@ -428,8 +432,13 @@ class RuntimeProfile {
 
   /// Create a subtree of runtime profiles from nodes, starting at *node_idx.
   /// On return, *node_idx is the index one past the end of this subtree
-  static RuntimeProfile* CreateFromThrift(ObjectPool* pool,
-      const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
+  static RuntimeProfile* CreateFromThrift(
+      ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
+
+  ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
+  /// 'children_lock_' must be held by the caller.
+  void AddChildLocked(
+      RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos);
 
   /// Print the child counters of the given counter name
   static void PrintChildCounters(const std::string& prefix,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a43b402..1d1d142 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -692,7 +692,7 @@ class Query(object):
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
 
-  SPILLED_PATTERN = re.compile("ExecOption:.*Spilled")
+  SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
   def __init__(self):
@@ -765,8 +765,8 @@ class QueryRunner(object):
           # Producing a query profile can be somewhat expensive. A v-tune profile of
           # impalad showed 10% of cpu time spent generating query profiles.
           report.profile = cursor.get_profile()
-          report.mem_was_spilled = \
-              QueryRunner.SPILLED_PATTERN.search(report.profile) is not None
+          report.mem_was_spilled = any([pattern.search(report.profile) is not None
+              for pattern in  QueryRunner.SPILLED_PATTERNS])
     except Exception as error:
       # A mem limit error would have been caught above, no need to check for that here.
       report.non_mem_limit_error = error


[2/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index f634193..46b91ba 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -31,10 +31,7 @@
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-state.h"
-#include "util/bloom-filter.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -44,35 +41,26 @@
 
 DEFINE_bool(enable_phj_probe_side_filtering, true, "Deprecated.");
 
-const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "Failed to acquire initial read buffer "
-    "for stream in hash join node $0. Reducing query concurrency or increasing the "
-    "memory limit may help this query to complete successfully.";
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read buffer for stream in hash join node $0. Reducing "
+    "query concurrency or increasing the memory limit may help this query to complete "
+    "successfully.";
 
 using namespace impala;
 using namespace llvm;
 using namespace strings;
+using std::unique_ptr;
 
 PartitionedHashJoinNode::PartitionedHashJoinNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : BlockingJoinNode("PartitionedHashJoinNode", tnode.hash_join_node.join_op,
-        pool, tnode, descs),
-    is_not_distinct_from_(),
-    block_mgr_client_(NULL),
-    partition_build_timer_(NULL),
+  : BlockingJoinNode(
+        "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
+    num_probe_rows_partitioned_(NULL),
     null_aware_eval_timer_(NULL),
     state_(PARTITIONING_BUILD),
-    partition_pool_(new ObjectPool()),
-    input_partition_(NULL),
-    null_aware_partition_(NULL),
-    non_empty_build_(false),
-    null_probe_rows_(NULL),
     null_probe_output_idx_(-1),
-    process_build_batch_fn_(NULL),
-    process_build_batch_fn_level0_(NULL),
     process_probe_batch_fn_(NULL),
-    process_probe_batch_fn_level0_(NULL),
-    insert_batch_fn_(NULL),
-    insert_batch_fn_level0_(NULL) {
+    process_probe_batch_fn_level0_(NULL) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
 }
 
@@ -86,35 +74,22 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   DCHECK(tnode.__isset.hash_join_node);
   const vector<TEqJoinCondition>& eq_join_conjuncts =
       tnode.hash_join_node.eq_join_conjuncts;
-  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
+  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+  // being separated out further.
+  builder_.reset(
+      new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
+  RETURN_IF_ERROR(builder_->Init(state, eq_join_conjuncts, tnode.runtime_filters));
+
+  for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ExprContext* ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].left, &ctx));
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.left, &ctx));
     probe_expr_ctxs_.push_back(ctx);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].right, &ctx));
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.right, &ctx));
     build_expr_ctxs_.push_back(ctx);
-    is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
   }
-  RETURN_IF_ERROR(
-      Expr::CreateExprTrees(pool_, tnode.hash_join_node.other_join_conjuncts,
-                            &other_join_conjunct_ctxs_));
-
-  for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
-    // If filter propagation not enabled, only consider building broadcast joins (that may
-    // be consumed by this fragment).
-    if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
-        !filter.is_broadcast_join) {
-      continue;
-    }
-    if (state->query_options().disable_row_runtime_filtering
-        && !filter.applied_on_partition_columns) {
-      continue;
-    }
-    FilterContext filter_ctx;
-    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.src_expr, &filter_ctx.expr));
-    filters_.push_back(filter_ctx);
-  }
-
+  RETURN_IF_ERROR(Expr::CreateExprTrees(
+      pool_, tnode.hash_join_node.other_join_conjuncts, &other_join_conjunct_ctxs_));
   DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
       eq_join_conjuncts.size() == 1);
   return Status::OK();
@@ -134,20 +109,17 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
 
+  RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+  runtime_profile()->PrependChild(builder_->profile());
+
   // build and probe exprs are evaluated in the context of the rows produced by our
   // right and left children, respectively
   RETURN_IF_ERROR(
       Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker()));
   RETURN_IF_ERROR(
       Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker()));
-  for (const FilterContext& ctx: filters_) {
-    RETURN_IF_ERROR(ctx.expr->Prepare(state, child(1)->row_desc(), expr_mem_tracker()));
-    AddExprCtxToFree(ctx.expr);
-  }
 
-  // Although ProcessBuildInput() may be run in a separate thread, it is safe to free
-  // local allocations in QueryMaintenance() since the build thread is not run
-  // concurrently with other expr evaluation in this join node.
+  // Build expressions may be evaluated during probing, so must be freed.
   // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
   // values in ExprValuesCache. Local allocations need to survive until the cache is reset
   // so we need to manually free probe expr local allocations.
@@ -161,84 +133,25 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
       Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
   AddExprCtxsToFree(other_join_conjunct_ctxs_);
 
-  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
-      Substitute("PartitionedHashJoinNode id=$0 ptr=$1", id_, this),
-      MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
-
-  const bool should_store_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
-      join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
-      std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
-                      std::logical_or<bool>());
   RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
-      should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
-      &ht_ctx_));
+      builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
+      state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
+      child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
   }
 
-  partition_build_timer_ = ADD_TIMER(runtime_profile(), "BuildPartitionTime");
-  num_hash_buckets_ =
-      ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
-  partitions_created_ =
-      ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
-  max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
-      "MaxPartitionLevel", TUnit::UNIT);
-  num_build_rows_partitioned_ =
-      ADD_COUNTER(runtime_profile(), "BuildRowsPartitioned", TUnit::UNIT);
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
-  num_repartitions_ =
-      ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
-  num_spilled_partitions_ =
-      ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
-  largest_partition_percent_ = runtime_profile()->AddHighWaterMarkCounter(
-      "LargestPartitionPercent", TUnit::UNIT);
-  num_hash_collisions_ =
-      ADD_COUNTER(runtime_profile(), "HashCollisions", TUnit::UNIT);
-
-  bool build_codegen_enabled = false;
+
   bool probe_codegen_enabled = false;
-  bool ht_construction_codegen_enabled = false;
-  Status codegen_status;
-  Status build_codegen_status;
   Status probe_codegen_status;
-  Status insert_codegen_status;
   if (state->codegen_enabled()) {
-    // Codegen for hashing rows
-    Function* hash_fn;
-    codegen_status = ht_ctx_->CodegenHashRow(state, false, &hash_fn);
-    Function* murmur_hash_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
-
-    // Codegen for evaluating build rows
-    Function* eval_build_row_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
-
-    if (codegen_status.ok()) {
-      // Codegen for build path
-      build_codegen_status =
-          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
-      if (build_codegen_status.ok()) build_codegen_enabled = true;
-      // Codegen for probe path
-      probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
-      if (probe_codegen_status.ok()) probe_codegen_enabled = true;
-      // Codegen for InsertBatch()
-      insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
-          eval_build_row_fn);
-      if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
-    } else {
-      build_codegen_status = codegen_status;
-      probe_codegen_status = codegen_status;
-      insert_codegen_status = codegen_status;
-    }
+    probe_codegen_status = CodegenProcessProbeBatch(state);
+    probe_codegen_enabled = probe_codegen_status.ok();
   }
   runtime_profile()->AddCodegenMsg(
-      build_codegen_enabled, codegen_status, "Build Side");
-  runtime_profile()->AddCodegenMsg(
-      probe_codegen_enabled, codegen_status, "Probe Side");
-  runtime_profile()->AddCodegenMsg(
-      ht_construction_codegen_enabled, codegen_status, "Hash Table Construction");
+      probe_codegen_enabled, probe_codegen_status, "Probe Side");
   return Status::OK();
 }
 
@@ -248,19 +161,10 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state));
   RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state));
-  for (const FilterContext& filter: filters_) RETURN_IF_ERROR(filter.expr->Open(state));
-  AllocateRuntimeFilters(state);
 
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    null_aware_partition_ = partition_pool_->Add(new Partition(state, this, 0));
-    RETURN_IF_ERROR(
-        null_aware_partition_->build_rows()->Init(id(), runtime_profile(), false));
-    RETURN_IF_ERROR(
-        null_aware_partition_->probe_rows()->Init(id(), runtime_profile(), false));
-    null_probe_rows_ = new BufferedTupleStream(
-        state, child(0)->row_desc(), state->block_mgr(), block_mgr_client_,
-        true /* use_initial_small_buffers */, false /* read_write */ );
-    RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+    RETURN_IF_ERROR(InitNullAwareProbePartition());
+    RETURN_IF_ERROR(InitNullProbeRows());
   }
 
   // Check for errors and free local allocations before opening children.
@@ -269,489 +173,116 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   // The prepare functions of probe expressions may have done local allocations implicitly
   // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to
   // be freed now as they don't get freed again till probing. Other exprs' local allocations
-  // are freed in ExecNode::FreeLocalAllocations() in ProcessBuildInput().
+  // are freed in ExecNode::FreeLocalAllocations().
   ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
 
-  RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+  RETURN_IF_ERROR(PrepareForProbe());
+
+  UpdateState(PARTITIONING_PROBE);
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   ResetForProbe();
-  DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+  DCHECK(null_aware_probe_partition_ == NULL
+      || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
   return Status::OK();
 }
 
 Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    non_empty_build_ = false;
     null_probe_output_idx_ = -1;
     matched_null_probe_.clear();
     nulls_build_batch_.reset();
   }
   state_ = PARTITIONING_BUILD;
   ht_ctx_->set_level(0);
-  ClosePartitions();
+  CloseAndDeletePartitions();
+  builder_->Reset();
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
   return ExecNode::Reset(state);
 }
 
-void PartitionedHashJoinNode::ClosePartitions() {
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    hash_partitions_[i]->Close(NULL);
+Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
+  DCHECK(false) << "Should not be called, PHJ uses the BuildSink API";
+  return Status::OK();
+}
+
+void PartitionedHashJoinNode::CloseAndDeletePartitions() {
+  // Close all the partitions and clean up all references to them.
+  for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {
+    if (partition != NULL) partition->Close(NULL);
   }
-  hash_partitions_.clear();
-  for (list<Partition*>::iterator it = spilled_partitions_.begin();
-      it != spilled_partitions_.end(); ++it) {
-    (*it)->Close(NULL);
+  probe_hash_partitions_.clear();
+  for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
+    partition->Close(NULL);
   }
   spilled_partitions_.clear();
-  for (list<Partition*>::iterator it = output_build_partitions_.begin();
-      it != output_build_partitions_.end(); ++it) {
-    (*it)->Close(NULL);
-  }
-  output_build_partitions_.clear();
   if (input_partition_ != NULL) {
     input_partition_->Close(NULL);
-    input_partition_ = NULL;
+    input_partition_.reset();
   }
-  if (null_aware_partition_ != NULL) {
-    null_aware_partition_->Close(NULL);
-    null_aware_partition_ = NULL;
+  if (null_aware_probe_partition_ != NULL) {
+    null_aware_probe_partition_->Close(NULL);
+    null_aware_probe_partition_.reset();
   }
+  output_build_partitions_.clear();
   if (null_probe_rows_ != NULL) {
     null_probe_rows_->Close();
-    delete null_probe_rows_;
-    null_probe_rows_ = NULL;
+    null_probe_rows_.reset();
   }
-  partition_pool_->Clear();
 }
 
 void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (ht_ctx_.get() != NULL) ht_ctx_->Close();
-
+  if (ht_ctx_ != NULL) ht_ctx_->Close();
   nulls_build_batch_.reset();
-
-  ClosePartitions();
-
-  if (block_mgr_client_ != NULL) {
-    state->block_mgr()->ClearReservations(block_mgr_client_);
-  }
+  CloseAndDeletePartitions();
+  if (builder_ != NULL) builder_->Close(state);
   Expr::Close(build_expr_ctxs_, state);
   Expr::Close(probe_expr_ctxs_, state);
   Expr::Close(other_join_conjunct_ctxs_, state);
-  for (const FilterContext& ctx: filters_) {
-    ctx.expr->Close(state);
-  }
   BlockingJoinNode::Close(state);
 }
 
-PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
-    PartitionedHashJoinNode* parent, int level)
+PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
+    PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
+    unique_ptr<BufferedTupleStream> probe_rows)
   : parent_(parent),
-    is_closed_(false),
-    is_spilled_(false),
-    level_(level) {
-  build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
-      state->block_mgr(), parent_->block_mgr_client_,
-      true /* use_initial_small_buffers */, false /* read_write */);
-  DCHECK(build_rows_ != NULL);
-  probe_rows_ = new BufferedTupleStream(state, parent_->child(0)->row_desc(),
-      state->block_mgr(), parent_->block_mgr_client_,
-      true /* use_initial_small_buffers */, false /* read_write */ );
-  DCHECK(probe_rows_ != NULL);
-}
-
-PartitionedHashJoinNode::Partition::~Partition() {
-  DCHECK(is_closed());
+    build_partition_(build_partition),
+    probe_rows_(std::move(probe_rows)) {
+  DCHECK(probe_rows_->has_write_block());
+  DCHECK(!probe_rows_->using_small_buffers());
+  DCHECK(!probe_rows_->is_pinned());
 }
 
-int64_t PartitionedHashJoinNode::Partition::EstimatedInMemSize() const {
-  return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
+PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
+  DCHECK(IsClosed());
 }
 
-void PartitionedHashJoinNode::Partition::Close(RowBatch* batch) {
-  if (is_closed()) return;
-  is_closed_ = true;
-
-  if (hash_tbl_.get() != NULL) {
-    COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
-    hash_tbl_->Close();
-  }
-
-  // Transfer ownership of build_rows_/probe_rows_ to batch if batch is not NULL.
-  // Otherwise, close the stream here.
-  if (build_rows_ != NULL) {
-    if (batch == NULL) {
-      build_rows_->Close();
-      delete build_rows_;
-    } else {
-      batch->AddTupleStream(build_rows_);
-    }
-    build_rows_ = NULL;
-  }
-  if (probe_rows_ != NULL) {
-    if (batch == NULL) {
-      probe_rows_->Close();
-      delete probe_rows_;
-    } else {
-      batch->AddTupleStream(probe_rows_);
-    }
-    probe_rows_ = NULL;
-  }
-}
-
-Status PartitionedHashJoinNode::Partition::Spill(bool unpin_all_build) {
-  DCHECK(!is_closed_);
-  // Spilling should occur before we start processing probe rows.
-  DCHECK(parent_->state_ != PROCESSING_PROBE &&
-         parent_->state_ != PROBING_SPILLED_PARTITION) << parent_->state_;
-  DCHECK((is_spilled_ && parent_->state_ == REPARTITIONING) ||
-         probe_rows_->num_rows() == 0);
-  // Close the hash table as soon as possible to release memory.
-  if (hash_tbl() != NULL) {
-    hash_tbl_->Close();
-    hash_tbl_.reset();
-  }
-
-  bool got_buffer = true;
-  if (build_rows_->using_small_buffers()) {
-    RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
-  }
-  // Unpin the stream as soon as possible to increase the chances that the
-  // SwitchToIoBuffers() call below will succeed.
-  RETURN_IF_ERROR(build_rows_->UnpinStream(unpin_all_build));
-
-  if (got_buffer && probe_rows_->using_small_buffers()) {
-    RETURN_IF_ERROR(probe_rows_->SwitchToIoBuffers(&got_buffer));
-  }
-  if (!got_buffer) {
-    // We'll try again to get the buffers when the stream fills up the small buffers.
-    VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
-               << this << " of join=" << parent_->id_ << " build small buffers="
-               << build_rows_->using_small_buffers() << " probe small buffers="
-               << probe_rows_->using_small_buffers();
-    VLOG_FILE << GetStackTrace();
-  }
-
-  if (!is_spilled_) {
-    COUNTER_ADD(parent_->num_spilled_partitions_, 1);
-    if (parent_->num_spilled_partitions_->value() == 1) {
-      parent_->runtime_profile()->AppendExecOption("Spilled");
-    }
-  }
-
-  is_spilled_ = true;
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
-    bool* built) {
-  DCHECK(build_rows_ != NULL);
-  *built = false;
-
-  // TODO: estimate the entire size of the hash table and reserve all of it from
-  // the block mgr.
-
-  // We got the buffers we think we will need, try to build the hash table.
-  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
-  if (!*built) return Status::OK();
+Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
   bool got_read_buffer;
-  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
-  DCHECK(got_read_buffer) << "Stream was already pinned.";
-
-  RowBatch batch(parent_->child(1)->row_desc(), state->batch_size(),
-      parent_->mem_tracker());
-  HashTableCtx* ctx = parent_->ht_ctx_.get();
-  // TODO: move the batch and indices as members to avoid reallocating.
-  vector<BufferedTupleStream::RowIdx> indices;
-  bool eos = false;
-
-  // Allocate the partition-local hash table. Initialize the number of buckets based on
-  // the number of build rows (the number of rows is known at this point). This assumes
-  // there are no duplicates which can be wrong. However, the upside in the common case
-  // (few/no duplicates) is large and the downside when there are is low (a bit more
-  // memory; the bucket memory is small compared to the memory needed for all the build
-  // side allocations).
-  // One corner case is if the stream contains tuples with zero footprint (no materialized
-  // slots). If the tuples occupy no space, this implies all rows will be duplicates, so
-  // create a small hash table, IMPALA-2256.
-  // We always start with small pages in the hash table.
-  int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
-      HashTable::EstimateNumBuckets(build_rows()->num_rows()) : state->batch_size() * 2;
-  hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
-      true /* store_duplicates */,
-      parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(),
-      1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
-  if (!hash_tbl_->Init()) goto not_built;
-
-  do {
-    RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
-    DCHECK_EQ(batch.num_rows(), indices.size());
-    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
-        << build_rows()->RowConsumesMemory();
-    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-    SCOPED_TIMER(parent_->build_timer_);
-    if (parent_->insert_batch_fn_ != NULL) {
-      InsertBatchFn insert_batch_fn;
-      if (ctx->level() == 0) {
-        insert_batch_fn = parent_->insert_batch_fn_level0_;
-      } else {
-        insert_batch_fn = parent_->insert_batch_fn_;
-      }
-      DCHECK(insert_batch_fn != NULL);
-      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
-        goto not_built;
-      }
-    } else {
-      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
-        goto not_built;
-      }
-    }
-    RETURN_IF_ERROR(state->GetQueryStatus());
-    parent_->FreeLocalAllocations();
-    batch.Reset();
-  } while (!eos);
-
-  // The hash table fits in memory and is built.
-  DCHECK(*built);
-  DCHECK(hash_tbl_.get() != NULL);
-  is_spilled_ = false;
-  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
-  return Status::OK();
-
-not_built:
-  *built = false;
-  if (hash_tbl_.get() != NULL) {
-    hash_tbl_->Close();
-    hash_tbl_.reset();
-  }
-  return Status::OK();
-}
-
-void PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) {
-  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
-      << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
-  DCHECK(ht_ctx_.get() != NULL);
-  for (int i = 0; i < filters_.size(); ++i) {
-    filters_[i].local_bloom_filter =
-        state->filter_bank()->AllocateScratchBloomFilter(filters_[i].filter->id());
-  }
-}
-
-void PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state,
-    int64_t total_build_rows) {
-  int32_t num_enabled_filters = 0;
-  // Use total_build_rows to estimate FP-rate of each Bloom filter, and publish
-  // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
-  // serialisation time, and reduces the cost of applying the filter at the scan - most
-  // significantly for per-row filters. However, the number of build rows could be a very
-  // poor estimate of the NDV - particularly if the filter expression is a function of
-  // several columns.
-  // TODO: Better heuristic.
-  for (const FilterContext& ctx: filters_) {
-    // TODO: Consider checking actual number of bits set in filter to compute FP rate.
-    // TODO: Consider checking this every few batches or so.
-    bool fp_rate_too_high =
-        state->filter_bank()->FpRateTooHigh(ctx.filter->filter_size(), total_build_rows);
-    state->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
-        fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
-
-    num_enabled_filters += !fp_rate_too_high;
-  }
-
-  if (filters_.size() > 0) {
-    if (num_enabled_filters == filters_.size()) {
-      runtime_profile()->AppendExecOption(
-          Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
-              filters_.size() == 1 ? "" : "s"));
-    } else {
-      string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
-          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
-          filters_.size() - num_enabled_filters);
-      runtime_profile()->AppendExecOption(exec_option);
-    }
-  }
-}
-
-bool PartitionedHashJoinNode::AppendRowStreamFull(BufferedTupleStream* stream,
-    TupleRow* row, Status* status) {
-  while (status->ok()) {
-    // Check if the stream is still using small buffers and try to switch to IO-buffers.
-    if (stream->using_small_buffers()) {
-      bool got_buffer;
-      *status = stream->SwitchToIoBuffers(&got_buffer);
-      if (!status->ok()) return false;
-      if (got_buffer) {
-        if (LIKELY(stream->AddRow(row, status))) return true;
-        if (!status->ok()) return false;
-      }
-    }
-    // We ran out of memory. Pick a partition to spill.
-    Partition* spilled_partition;
-    *status = SpillPartition(&spilled_partition);
-    if (!status->ok()) return false;
-    if (stream->AddRow(row, status)) return true;
-    // Spilling one partition does not guarantee we can append a row. Keep
-    // spilling until we can append this row.
-  }
-  return false;
-}
-
-// TODO: Can we do better with a different spilling heuristic?
-Status PartitionedHashJoinNode::SpillPartition(Partition** spilled_partition) {
-  int64_t max_freed_mem = 0;
-  int partition_idx = -1;
-  *spilled_partition = NULL;
-
-  // Iterate over the partitions and pick the largest partition to spill.
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* candidate = hash_partitions_[i];
-    if (candidate->is_closed()) continue;
-    if (candidate->is_spilled()) continue;
-    int64_t mem = candidate->build_rows()->bytes_in_mem(false);
-    // TODO: What should we do here if probe_rows()->num_rows() > 0 ? We should be able
-    // to spill INNER JOINS, but not many of the other joins.
-    if (candidate->hash_tbl() != NULL) {
-      // IMPALA-1488: Do not spill partitions that already had matches, because we
-      // are going to lose information and return wrong results.
-      if (UNLIKELY(candidate->hash_tbl()->HasMatches())) continue;
-      mem += candidate->hash_tbl()->ByteSize();
-    }
-    if (mem > max_freed_mem) {
-      max_freed_mem = mem;
-      partition_idx = i;
-    }
-  }
-
-  if (partition_idx == -1) {
-    // Could not find a partition to spill. This means the mem limit was just too low.
-    return runtime_state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
-  }
-
-  VLOG(2) << "Spilling partition: " << partition_idx << endl << NodeDebugString();
-  RETURN_IF_ERROR(hash_partitions_[partition_idx]->Spill(false));
-  DCHECK(hash_partitions_[partition_idx]->probe_rows()->has_write_block());
-  hash_tbls_[partition_idx] = NULL;
-  *spilled_partition = hash_partitions_[partition_idx];
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
-  // Do a full scan of child(1) and partition the rows.
-  {
-    SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-    RETURN_IF_ERROR(child(1)->Open(state));
+  RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_,
+        Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_));
   }
-  RETURN_IF_ERROR(ProcessBuildInput(state, 0));
-
-  UpdateState(PROCESSING_PROBE);
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level) {
-  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
-    return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id_,
-        MAX_PARTITION_DEPTH);
-  }
-
-  DCHECK(hash_partitions_.empty());
-  if (input_partition_ != NULL) {
-    DCHECK(input_partition_->build_rows() != NULL);
-    DCHECK_EQ(input_partition_->build_rows()->blocks_pinned(), 0) << NodeDebugString();
-    bool got_read_buffer;
-    RETURN_IF_ERROR(
-        input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer));
-    if (!got_read_buffer) {
-      return mem_tracker()->MemLimitExceeded(
-          state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-    }
-  }
-
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* new_partition = new Partition(state, this, level);
-    DCHECK(new_partition != NULL);
-    hash_partitions_.push_back(partition_pool_->Add(new_partition));
-    RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
-    // Initialize a buffer for the probe here to make sure why have it if we need it.
-    // While this is not strictly necessary (there are some cases where we won't need this
-    // buffer), the benefit is low. Not grabbing this buffer means there is an additional
-    // buffer that could be used for the build side. However since this is only one
-    // buffer, there is only a small range of build input sizes where this is beneficial
-    // (an IO buffer size). It makes the logic much more complex to enable this
-    // optimization.
-    RETURN_IF_ERROR(new_partition->probe_rows()->Init(id(), runtime_profile(), false));
-  }
-  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
-  COUNTER_SET(max_partition_level_, level);
-
-  DCHECK_EQ(build_batch_->num_rows(), 0);
-  bool eos = false;
-  int64_t total_build_rows = 0;
-  while (!eos) {
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    // 'probe_expr_ctxs_' should have made no local allocations in this function.
-    DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_))
-        << Expr::DebugString(probe_expr_ctxs_);
-    if (input_partition_ == NULL) {
-      // If we are still consuming batches from the build side.
-      {
-        SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-        RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos));
-      }
-      COUNTER_ADD(build_row_counter_, build_batch_->num_rows());
-    } else {
-      // If we are consuming batches that have already been partitioned.
-      RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(build_batch_.get(), &eos));
-    }
-    total_build_rows += build_batch_->num_rows();
-
-    SCOPED_TIMER(partition_build_timer_);
-    if (process_build_batch_fn_ == NULL) {
-      bool build_filters = ht_ctx_->level() == 0;
-      RETURN_IF_ERROR(ProcessBuildBatch(build_batch_.get(), build_filters));
+void PartitionedHashJoinNode::ProbePartition::Close(RowBatch* batch) {
+  if (IsClosed()) return;
+  if (probe_rows_ != NULL) {
+    if (batch == NULL) {
+      probe_rows_->Close();
     } else {
-      DCHECK(process_build_batch_fn_level0_ != NULL);
-      if (ht_ctx_->level() == 0) {
-        RETURN_IF_ERROR(
-            process_build_batch_fn_level0_(this, build_batch_.get(), true));
-      } else {
-        RETURN_IF_ERROR(process_build_batch_fn_(this, build_batch_.get(), false));
-      }
+      batch->AddTupleStream(probe_rows_.release());
     }
-    build_batch_->Reset();
-    DCHECK(!build_batch_->AtCapacity());
+    probe_rows_.reset();
   }
-
-  if (ht_ctx_->level() == 0) PublishRuntimeFilters(state, total_build_rows);
-
-  if (input_partition_ != NULL) {
-    // Done repartitioning build input, close it now.
-    input_partition_->build_rows_->Close();
-    input_partition_->build_rows_ = NULL;
-  }
-
-  stringstream ss;
-  ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", id(),
-            hash_partitions_[0]->level_, total_build_rows);
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    double percent =
-        partition->build_rows()->num_rows() * 100 / static_cast<double>(total_build_rows);
-    ss << "  " << i << " "  << (partition->is_spilled() ? "spilled" : "not spilled")
-       << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
-       << "    #rows:" << partition->build_rows()->num_rows() << endl;
-    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
-  }
-  VLOG(2) << ss.str();
-
-  COUNTER_ADD(num_build_rows_partitioned_, total_build_rows);
-  non_empty_build_ |= (total_build_rows > 0);
-  RETURN_IF_ERROR(BuildHashTables(state));
-  return Status::OK();
 }
 
 Status PartitionedHashJoinNode::NextProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
+  DCHECK_EQ(state_, PARTITIONING_PROBE);
   DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
   do {
     // Loop until we find a non-empty row batch.
@@ -777,6 +308,7 @@ Status PartitionedHashJoinNode::NextProbeRowBatch(
 Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
   DCHECK(input_partition_ != NULL);
+  DCHECK(state_ == PROBING_SPILLED_PARTITION || state_ == REPARTITIONING_PROBE);
   probe_batch_->TransferResourceOwnership(out_batch);
   if (out_batch->AtCapacity()) {
     // The out_batch has resources associated with it that will be recycled on the
@@ -788,114 +320,108 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
   if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
     // Continue from the current probe stream.
     bool eos = false;
-    RETURN_IF_ERROR(input_partition_->probe_rows()->GetNext(probe_batch_.get(), &eos));
+    RETURN_IF_ERROR(probe_rows->GetNext(probe_batch_.get(), &eos));
     DCHECK_GT(probe_batch_->num_rows(), 0);
     ResetForProbe();
   } else {
-    // Done with this partition.
-    if (!input_partition_->is_spilled() &&
-        (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN)) {
+    // Finished processing spilled probe rows from this partition.
+    if (state_ == PROBING_SPILLED_PARTITION && NeedToProcessUnmatchedBuildRows()) {
+      // If the build partition was in memory, we are done probing this partition.
       // In case of right-outer, right-anti and full-outer joins, we move this partition
       // to the list of partitions that we need to output their unmatched build rows.
       DCHECK(output_build_partitions_.empty());
-      DCHECK(input_partition_->hash_tbl_.get() != NULL) << " id: " << id_
-           << " Build: " << input_partition_->build_rows()->num_rows()
-           << " Probe: " << probe_rows->num_rows() << endl
-           << GetStackTrace();
+      DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
+          << " id: " << id_
+          << " Build: " << input_partition_->build_partition()->build_rows()->num_rows()
+          << " Probe: " << probe_rows->num_rows() << endl
+          << GetStackTrace();
       hash_tbl_iterator_ =
-          input_partition_->hash_tbl_->FirstUnmatched(ht_ctx_.get());
-      output_build_partitions_.push_back(input_partition_);
+          input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+      output_build_partitions_.push_back(input_partition_->build_partition());
     } else {
-      // In any other case, just close the input partition.
-      input_partition_->Close(out_batch);
-      input_partition_ = NULL;
+      // In any other case, just close the input build partition.
+      input_partition_->build_partition()->Close(out_batch);
     }
+    input_partition_->Close(out_batch);
+    input_partition_.reset();
     current_probe_row_ = NULL;
     probe_batch_pos_ = -1;
   }
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
+Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
+    RuntimeState* state, bool* got_partition) {
+  VLOG(2) << "PrepareSpilledPartitionForProbe\n" << NodeDebugString();
   DCHECK(input_partition_ == NULL);
-  if (spilled_partitions_.empty()) return Status::OK();
-  VLOG(2) << "PrepareNextPartition\n" << NodeDebugString();
+  DCHECK_EQ(builder_->num_hash_partitions(), 0);
+  DCHECK(probe_hash_partitions_.empty());
+  if (spilled_partitions_.empty()) {
+    *got_partition = false;
+    return Status::OK();
+  }
 
-  input_partition_ = spilled_partitions_.front();
+  input_partition_ = std::move(spilled_partitions_.front());
   spilled_partitions_.pop_front();
-  DCHECK(input_partition_->is_spilled());
+  PhjBuilder::Partition* build_partition = input_partition_->build_partition();
+  DCHECK(build_partition->is_spilled());
 
-  // Reserve one buffer to read the probe side.
-  bool got_read_buffer;
-  RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer));
-  if (!got_read_buffer) {
-    return mem_tracker()->MemLimitExceeded(
-        state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-  }
-  ht_ctx_->set_level(input_partition_->level_);
-
-  int64_t mem_limit = mem_tracker()->SpareCapacity();
-  // Try to build a hash table on top the spilled build rows.
-  bool built = false;
-  int64_t estimated_memory = input_partition_->EstimatedInMemSize();
-  if (estimated_memory < mem_limit) {
-    ht_ctx_->set_level(input_partition_->level_);
-    RETURN_IF_ERROR(input_partition_->BuildHashTable(state, &built));
-  } else {
-    LOG(INFO) << "In hash join id=" << id_ << " the estimated needed memory ("
-        << estimated_memory << ") for partition " << input_partition_ << " with "
-        << input_partition_->build_rows()->num_rows() << " build rows is larger "
-        << " than the mem_limit (" << mem_limit << ").";
-  }
+  // Make sure we have a buffer to read the probe rows before we build the hash table.
+  RETURN_IF_ERROR(input_partition_->PrepareForRead());
+  ht_ctx_->set_level(build_partition->level());
+
+  // Try to build a hash table for the spilled build partition.
+  bool built;
+  RETURN_IF_ERROR(build_partition->BuildHashTable(&built));
 
   if (!built) {
     // This build partition still does not fit in memory, repartition.
-    UpdateState(REPARTITIONING);
-    DCHECK(input_partition_->is_spilled());
-    input_partition_->Spill(false);
-    ht_ctx_->set_level(input_partition_->level_ + 1);
-    int64_t num_input_rows = input_partition_->build_rows()->num_rows();
-    RETURN_IF_ERROR(ProcessBuildInput(state, input_partition_->level_ + 1));
+    UpdateState(REPARTITIONING_BUILD);
+
+    int next_partition_level = build_partition->level() + 1;
+    if (UNLIKELY(next_partition_level >= MAX_PARTITION_DEPTH)) {
+      return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id(),
+          MAX_PARTITION_DEPTH);
+    }
+    ht_ctx_->set_level(next_partition_level);
+
+    // Spill to free memory from hash tables and pinned streams for use in new partitions.
+    build_partition->Spill(BufferedTupleStream::UNPIN_ALL);
+    // Temporarily free up the probe buffer to use when repartitioning.
+    input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
+    DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
+    int64_t num_input_rows = build_partition->build_rows()->num_rows();
+    RETURN_IF_ERROR(builder_->RepartitionBuildInput(
+        build_partition, next_partition_level, input_partition_->probe_rows()));
 
     // Check if there was any reduction in the size of partitions after repartitioning.
-    int64_t largest_partition = LargestSpilledPartition();
-    DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
-        "more rows than the input";
-    if (UNLIKELY(num_input_rows == largest_partition)) {
+    int64_t largest_partition_rows = builder_->LargestPartitionRows();
+    DCHECK_GE(num_input_rows, largest_partition_rows) << "Cannot have a partition with "
+                                                         "more rows than the input";
+    if (UNLIKELY(num_input_rows == largest_partition_rows)) {
       return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
-          input_partition_->level_ + 1, num_input_rows);
+          next_partition_level, num_input_rows);
     }
+
+    RETURN_IF_ERROR(PrepareForProbe());
+    UpdateState(REPARTITIONING_PROBE);
   } else {
-    DCHECK(hash_partitions_.empty());
-    DCHECK(!input_partition_->is_spilled());
-    DCHECK(input_partition_->hash_tbl() != NULL);
+    DCHECK(!input_partition_->build_partition()->is_spilled());
+    DCHECK(input_partition_->build_partition()->hash_tbl() != NULL);
     // In this case, we did not have to partition the build again, we just built
     // a hash table. This means the probe does not have to be partitioned either.
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      hash_tbls_[i] = input_partition_->hash_tbl();
+      hash_tbls_[i] = input_partition_->build_partition()->hash_tbl();
     }
     UpdateState(PROBING_SPILLED_PARTITION);
   }
 
-  COUNTER_ADD(num_repartitions_, 1);
   COUNTER_ADD(num_probe_rows_partitioned_, input_partition_->probe_rows()->num_rows());
+  *got_partition = true;
   return Status::OK();
 }
 
-int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
-  int64_t max_rows = 0;
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    DCHECK(partition != NULL) << i << " " << hash_partitions_.size();
-    if (partition->is_closed() || !partition->is_spilled()) continue;
-    int64_t rows = partition->build_rows()->num_rows();
-    rows += partition->probe_rows()->num_rows();
-    if (rows > max_rows) max_rows = rows;
-  }
-  return max_rows;
-}
-
 int PartitionedHashJoinNode::ProcessProbeBatch(
     const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
     RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
@@ -948,24 +474,26 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
   Status status = Status::OK();
   while (true) {
+    DCHECK(!*eos);
     DCHECK(status.ok());
     DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
 
-    if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
-        !output_build_partitions_.empty())  {
-      // In case of right-outer, right-anti and full-outer joins, flush the remaining
-      // unmatched build rows of any partition we are done processing, before processing
-      // the next batch.
+    if (!output_build_partitions_.empty()) {
+      DCHECK(NeedToProcessUnmatchedBuildRows());
+
+      // Flush the remaining unmatched build rows of any partitions we are done
+      // processing before moving onto the next partition.
       OutputUnmatchedBuild(out_batch);
       if (!output_build_partitions_.empty()) break;
 
-      // Finished to output unmatched build rows, move to next partition.
-      DCHECK(hash_partitions_.empty());
-      RETURN_IF_ERROR(PrepareNextPartition(state));
-      if (input_partition_ == NULL) {
+      // Finished outputting unmatched build rows, move to next partition.
+      DCHECK_EQ(builder_->num_hash_partitions(), 0);
+      DCHECK(probe_hash_partitions_.empty());
+      bool got_partition;
+      RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+      if (!got_partition) {
         *eos = true;
         break;
       }
@@ -974,7 +502,8 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
       // In this case, we want to output rows from the null aware partition.
-      if (null_aware_partition_ == NULL) {
+      if (builder_->null_aware_partition() == NULL) {
+        DCHECK(null_aware_probe_partition_ == NULL);
         *eos = true;
         break;
       }
@@ -985,14 +514,14 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
         continue;
       }
 
-      if (nulls_build_batch_.get() != NULL) {
+      if (nulls_build_batch_ != NULL) {
         RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
         if (out_batch->AtCapacity()) break;
         continue;
       }
     }
 
-    // Finish up the current batch.
+    // Finish processing rows in the current probe batch.
     if (probe_batch_pos_ != -1) {
       // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
       // in the xcompiled function, so call it here instead.
@@ -1026,10 +555,16 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     }
 
     // Try to continue from the current probe side input.
-    if (input_partition_ == NULL) {
+    if (state_ == PARTITIONING_PROBE) {
+      DCHECK(input_partition_ == NULL);
       RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
     } else {
-      RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+      DCHECK(state_ == REPARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION)
+          << state_;
+      DCHECK(probe_side_eos_);
+      if (input_partition_ != NULL) {
+        RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+      }
     }
     // Free local allocations of the probe side expressions only after ExprValuesCache
     // has been reset.
@@ -1045,30 +580,34 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     if (probe_batch_pos_ == 0) continue;
     DCHECK_EQ(probe_batch_pos_, -1);
 
-    // Finished up all probe rows for hash_partitions_.
-    RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
-    if (out_batch->AtCapacity()) break;
+    // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
+    // the hash partitions, e.g. if we had to output some unmatched build rows below.
+    if (builder_->num_hash_partitions() != 0) {
+      RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
+      if (out_batch->AtCapacity()) break;
+    }
 
-    if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
-        !output_build_partitions_.empty()) {
+    if (!output_build_partitions_.empty()) {
+      DCHECK(NeedToProcessUnmatchedBuildRows());
       // There are some partitions that need to flush their unmatched build rows.
-      continue;
+      OutputUnmatchedBuild(out_batch);
+      if (!output_build_partitions_.empty()) break;
     }
-    // Move onto the next partition.
-    RETURN_IF_ERROR(PrepareNextPartition(state));
+    // Move onto the next spilled partition.
+    bool got_partition;
+    RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+    if (got_partition) continue; // Probe the spilled partition.
 
-    if (input_partition_ == NULL) {
-      if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        RETURN_IF_ERROR(PrepareNullAwarePartition());
-      }
-      if (null_aware_partition_ == NULL) {
-        *eos = true;
-        break;
-      } else {
-        *eos = false;
-      }
+    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+      // Prepare the null-aware partitions, then resume at the top of the loop to output
+      // the rows.
+      RETURN_IF_ERROR(PrepareNullAwarePartition());
+      continue;
     }
+    DCHECK(builder_->null_aware_partition() == NULL);
+
+    *eos = true;
+    break;
   }
 
   if (ReachedLimit()) *eos = true;
@@ -1077,8 +616,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
 void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   SCOPED_TIMER(probe_timer_);
-  DCHECK(join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-         join_op_ == TJoinOp::FULL_OUTER_JOIN);
+  DCHECK(NeedToProcessUnmatchedBuildRows());
   DCHECK(!output_build_partitions_.empty());
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   const int num_conjuncts = conjunct_ctxs_.size();
@@ -1141,8 +679,9 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
 
 Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
     RowBatch* out_batch) {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() == NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ == NULL);
   DCHECK_NE(probe_batch_pos_, -1);
 
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
@@ -1151,12 +690,12 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
     if (out_batch->AtCapacity()) return Status::OK();
     bool eos;
     RETURN_IF_ERROR(null_probe_rows_->GetNext(probe_batch_.get(), &eos));
-    if (probe_batch_->num_rows() == 0) {
+    if (probe_batch_->num_rows() == 0 && eos) {
       // All done.
-      null_aware_partition_->Close(out_batch);
-      null_aware_partition_ = NULL;
-      out_batch->AddTupleStream(null_probe_rows_);
-      null_probe_rows_ = NULL;
+      builder_->CloseNullAwarePartition(out_batch);
+      null_aware_probe_partition_->Close(out_batch);
+      null_aware_probe_partition_.reset();
+      out_batch->AddTupleStream(null_probe_rows_.release());
       return Status::OK();
     }
   }
@@ -1183,14 +722,54 @@ static Status NullAwareAntiJoinError(bool build) {
       " many NULLs on the $0 side to perform this join.", build ? "build" : "probe"));
 }
 
+Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
+  RuntimeState* state = runtime_state_;
+  unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>(
+      state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(),
+      false /* use_initial_small_buffers */, false /* read_write */);
+  Status status = probe_rows->Init(id(), runtime_profile(), false);
+  if (!status.ok()) goto error;
+  bool got_buffer;
+  status = probe_rows->PrepareForWrite(&got_buffer);
+  if (!status.ok()) goto error;
+  if (!got_buffer) {
+    status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+    goto error;
+  }
+  null_aware_probe_partition_.reset(new ProbePartition(
+      state, this, builder_->null_aware_partition(), std::move(probe_rows)));
+  return Status::OK();
+
+error:
+  DCHECK(!status.ok());
+  // Ensure the temporary 'probe_rows' stream is closed correctly on error.
+  probe_rows->Close();
+  return status;
+}
+
+Status PartitionedHashJoinNode::InitNullProbeRows() {
+  RuntimeState* state = runtime_state_;
+  null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
+      state->block_mgr(), builder_->block_mgr_client(),
+      false /* use_initial_small_buffers */, false /* read_write */);
+  RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+  bool got_buffer;
+  RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+  }
+  return Status::OK();
+}
+
 Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() == NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ == NULL);
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
-  BufferedTupleStream* build_stream = null_aware_partition_->build_rows();
-  BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+  BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
+  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
 
   if (build_stream->num_rows() == 0) {
     // There were no build rows. Nothing to do. Just prepare to output the null
@@ -1219,14 +798,15 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
 
 Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RowBatch* out_batch) {
-  DCHECK(null_aware_partition_ != NULL);
-  DCHECK(nulls_build_batch_.get() != NULL);
+  DCHECK(builder_->null_aware_partition() != NULL);
+  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(nulls_build_batch_ != NULL);
 
   ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
   int num_join_conjuncts = other_join_conjunct_ctxs_.size();
-  DCHECK(probe_batch_.get() != NULL);
+  DCHECK(probe_batch_ != NULL);
 
-  BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
     probe_batch_pos_ = 0;
     probe_batch_->TransferResourceOwnership(out_batch);
@@ -1237,7 +817,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
     RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
 
     if (probe_batch_->num_rows() == 0) {
-      RETURN_IF_ERROR(EvaluateNullProbe(null_aware_partition_->build_rows()));
+      RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows()));
       nulls_build_batch_.reset();
       RETURN_IF_ERROR(PrepareNullAwareNullProbe());
       return Status::OK();
@@ -1269,100 +849,55 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   return Status::OK();
 }
 
-// When this function is called, we've finished processing the current build input
-// (either from child(1) or from repartitioning a spilled partition). The build rows
-// have only been partitioned, we still need to build hash tables over them. Some
-// of the partitions could have already been spilled and attempting to build hash
-// tables over the non-spilled ones can cause them to spill.
-//
-// At the end of the function we'd like all partitions to either have a hash table
-// (and therefore not spilled) or be spilled. Partitions that have a hash table don't
-// need to spill on the probe side.
-//
-// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
-// build rows and hash table and the value is the expected IO savings.
-// For now, we go with a greedy solution.
-//
-// TODO: implement the knapsack solution.
-Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
-  DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
-
-  // First loop over the partitions and build hash tables for the partitions that did
-  // not already spill.
-  for (Partition* partition: hash_partitions_) {
-    if (partition->build_rows()->num_rows() == 0) {
-      // This partition is empty, no need to do anything else.
-      partition->Close(NULL);
-      continue;
-    }
-
-    if (!partition->is_spilled()) {
-      bool built = false;
-      DCHECK(partition->build_rows()->is_pinned());
-      RETURN_IF_ERROR(partition->BuildHashTable(state, &built));
-      // If we did not have enough memory to build this hash table, we need to spill this
-      // partition (clean up the hash table, unpin build).
-      if (!built) RETURN_IF_ERROR(partition->Spill(true));
-    }
-  }
+Status PartitionedHashJoinNode::PrepareForProbe() {
+  DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+  DCHECK(probe_hash_partitions_.empty());
 
-  // Collect all the spilled partitions that don't have an IO buffer. We need to reserve
-  // an IO buffer for those partitions. Reserving an IO buffer can cause more partitions
-  // to spill so this process is recursive.
-  list<Partition*> spilled_partitions;
-  for (Partition* partition: hash_partitions_) {
-    if (partition->is_closed()) continue;
-    if (partition->is_spilled() && partition->probe_rows()->using_small_buffers()) {
-      spilled_partitions.push_back(partition);
-    }
-  }
-  while (!spilled_partitions.empty()) {
-    Partition* partition = spilled_partitions.front();
-    spilled_partitions.pop_front();
-
-    while (true) {
-      bool got_buffer;
-      RETURN_IF_ERROR(partition->probe_rows()->SwitchToIoBuffers(&got_buffer));
-      if (got_buffer) break;
-      Partition* spilled_partition;
-      RETURN_IF_ERROR(SpillPartition(&spilled_partition));
-      DCHECK(spilled_partition->is_spilled());
-      if (spilled_partition->probe_rows()->using_small_buffers()) {
-        spilled_partitions.push_back(spilled_partition);
-      }
-    }
+  // Initialize the probe partitions, providing them with probe streams.
+  vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams();
+  probe_hash_partitions_.resize(PARTITION_FANOUT);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
 
-    DCHECK(partition->probe_rows()->has_write_block());
-    DCHECK(!partition->probe_rows()->using_small_buffers());
+    DCHECK(!probe_streams.empty()) << "Builder should have created enough streams";
+    CreateProbePartition(i, std::move(probe_streams.back()));
+    probe_streams.pop_back();
   }
+  DCHECK(probe_streams.empty()) << "Builder should not have created extra streams";
 
-  // At this point, the partition is in one of these states:
-  // 1. closed. All done, no buffers in either the build or probe stream.
-  // 2. in_mem. The build side is pinned and has a hash table built.
-  // 3. spilled. The build side is fully unpinned and the probe side has an io
-  //    sized buffer.
-  for (Partition* partition: hash_partitions_) {
-    if (partition->hash_tbl() != NULL) partition->probe_rows()->Close();
+  // Initialize the hash_tbl_ caching array.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    hash_tbls_[i] = builder_->hash_partition(i)->hash_tbl();
   }
 
-  // TODO: at this point we could have freed enough memory to pin and build some
-  // spilled partitions. This can happen, for example is there is a lot of skew.
-  // Partition 1: 10GB (pinned initially).
-  // Partition 2,3,4: 1GB (spilled during partitioning the build).
-  // In the previous step, we could have unpinned 10GB (because there was not enough
-  // memory to build a hash table over it) which can now free enough memory to
-  // build hash tables over the remaining 3 partitions.
-  // We start by spilling the largest partition though so the build input would have
-  // to be pretty pathological.
-  // Investigate if this is worthwhile.
-
-  // Initialize the hash_tbl_ caching array.
+  // Validate the state of the partitions.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    if (build_partition->IsClosed()) {
+      DCHECK(hash_tbls_[i] == NULL);
+      DCHECK(probe_partition == NULL);
+    } else if (build_partition->is_spilled()) {
+      DCHECK(hash_tbls_[i] == NULL);
+      DCHECK(probe_partition != NULL);
+    } else {
+      DCHECK(hash_tbls_[i] != NULL);
+      DCHECK(probe_partition == NULL);
+    }
   }
   return Status::OK();
 }
 
+void PartitionedHashJoinNode::CreateProbePartition(
+    int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
+  DCHECK_GE(partition_idx, 0);
+  DCHECK_LT(partition_idx, probe_hash_partitions_.size());
+  DCHECK(probe_hash_partitions_[partition_idx] == NULL);
+  probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_,
+      this, builder_->hash_partition(partition_idx), std::move(probe_rows));
+}
+
 Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
   if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
@@ -1411,48 +946,59 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
   // add them to the list of partitions that need to output any unmatched build rows.
   // This partition will be closed by the function that actually outputs unmatched build
   // rows.
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition->is_closed()) continue;
-    if (partition->is_spilled()) {
-      DCHECK(partition->hash_tbl() == NULL) << NodeDebugString();
-      // Unpin the build and probe stream to free up more memory. We need to free all
-      // memory so we can recurse the algorithm and create new hash partitions from
-      // spilled partitions.
-      RETURN_IF_ERROR(partition->build_rows()->UnpinStream(true));
-      RETURN_IF_ERROR(partition->probe_rows()->UnpinStream(true));
-
-      // Push new created partitions at the front. This means a depth first walk
+  DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+  DCHECK_EQ(probe_hash_partitions_.size(), PARTITION_FANOUT);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    if (build_partition->IsClosed()) {
+      DCHECK(probe_partition == NULL);
+      continue;
+    }
+    if (build_partition->is_spilled()) {
+      DCHECK(probe_partition != NULL);
+      DCHECK(build_partition->hash_tbl() == NULL) << NodeDebugString();
+      // Unpin the probe stream to free up more memory. We need to free all memory so we
+      // can recurse the algorithm and create new hash partitions from spilled partitions.
+      // TODO: we shouldn't need to unpin the build stream if we stop spilling
+      // while probing.
+      RETURN_IF_ERROR(
+          build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0);
+      RETURN_IF_ERROR(
+          probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+
+      // Push newly created partitions at the front. This means a depth first walk
       // (more finely partitioned partitions are processed first). This allows us
       // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(partition);
+      spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
     } else {
-      DCHECK_EQ(partition->probe_rows()->num_rows(), 0)
-        << "No probe rows should have been spilled for this partition.";
-      if (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
-          join_op_ == TJoinOp::FULL_OUTER_JOIN) {
+      DCHECK(probe_partition == NULL);
+      if (NeedToProcessUnmatchedBuildRows()) {
         if (output_build_partitions_.empty()) {
-          hash_tbl_iterator_ = partition->hash_tbl_->FirstUnmatched(ht_ctx_.get());
+          hash_tbl_iterator_ = build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
         }
-        output_build_partitions_.push_back(partition);
+        output_build_partitions_.push_back(build_partition);
       } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
         // For NAAJ, we need to try to match all the NULL probe rows with this partition
         // before closing it. The NULL probe rows could have come from any partition
         // so we collect them all and match them at the end.
-        RETURN_IF_ERROR(EvaluateNullProbe(partition->build_rows()));
-        partition->Close(batch);
+        RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows()));
+        build_partition->Close(batch);
       } else {
-        partition->Close(batch);
+        build_partition->Close(batch);
       }
     }
   }
 
   // Just finished evaluating the null probe rows with all the non-spilled build
   // partitions. Unpin this now to free this memory for repartitioning.
-  if (null_probe_rows_ != NULL) RETURN_IF_ERROR(null_probe_rows_->UnpinStream());
+  if (null_probe_rows_ != NULL)
+    RETURN_IF_ERROR(
+        null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
-  hash_partitions_.clear();
-  input_partition_ = NULL;
+  builder_->ClearHashPartitions();
+  probe_hash_partitions_.clear();
   return Status::OK();
 }
 
@@ -1465,17 +1011,30 @@ void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) co
   *out << ")";
 }
 
-void PartitionedHashJoinNode::UpdateState(HashJoinState s) {
-  state_ = s;
+void PartitionedHashJoinNode::UpdateState(HashJoinState next_state) {
+  // Validate the state transition.
+  switch (state_) {
+    case PARTITIONING_BUILD: DCHECK_EQ(next_state, PARTITIONING_PROBE); break;
+    case PARTITIONING_PROBE:
+    case REPARTITIONING_PROBE:
+    case PROBING_SPILLED_PARTITION:
+      DCHECK(
+          next_state == REPARTITIONING_BUILD || next_state == PROBING_SPILLED_PARTITION);
+      break;
+    case REPARTITIONING_BUILD: DCHECK_EQ(next_state, REPARTITIONING_PROBE); break;
+    default: DCHECK(false) << "Invalid state " << state_;
+  }
+  state_ = next_state;
   VLOG(2) << "Transitioned State:" << endl << NodeDebugString();
 }
 
 string PartitionedHashJoinNode::PrintState() const {
   switch (state_) {
     case PARTITIONING_BUILD: return "PartitioningBuild";
-    case PROCESSING_PROBE: return "ProcessingProbe";
-    case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartitions";
-    case REPARTITIONING: return "Repartitioning";
+    case PARTITIONING_PROBE: return "PartitioningProbe";
+    case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartition";
+    case REPARTITIONING_BUILD: return "RepartitioningBuild";
+    case REPARTITIONING_PROBE: return "RepartitioningProbe";
     default: DCHECK(false);
   }
   return "";
@@ -1485,58 +1044,41 @@ string PartitionedHashJoinNode::NodeDebugString() const {
   stringstream ss;
   ss << "PartitionedHashJoinNode (id=" << id() << " op=" << join_op_
      << " state=" << PrintState()
-     << " #partitions=" << hash_partitions_.size()
-     << " #spilled_partitions=" << spilled_partitions_.size()
-     << ")" << endl;
-
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    ss << i << ": ptr=" << partition;
-    DCHECK(partition != NULL);
-    if (partition->is_closed()) {
-      ss << " Closed" << endl;
-      continue;
-    }
-    if (partition->is_spilled()) {
-      ss << " Spilled" << endl;
-    }
-    DCHECK(partition->build_rows() != NULL);
-    DCHECK(partition->probe_rows() != NULL);
-    ss << endl
-       << "   Build Rows: " << partition->build_rows()->num_rows()
-       << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")"
-       << endl;
-    ss << "   Probe Rows: " << partition->probe_rows()->num_rows()
-       << " (Blocks pinned: " << partition->probe_rows()->blocks_pinned() << ")"
+     << " #hash_partitions=" << builder_->num_hash_partitions()
+     << " #spilled_partitions=" << spilled_partitions_.size() << ")" << endl;
+
+  ss << "PhjBuilder: " << builder_->DebugString();
+
+  ss << "Probe hash partitions: " << probe_hash_partitions_.size() << ":" << endl;
+  for (int i = 0; i < probe_hash_partitions_.size(); ++i) {
+    ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+    ss << "  Probe hash partition " << i << ": probe ptr=" << probe_partition
+       << "    Probe Rows: " << probe_partition->probe_rows()->num_rows()
+       << "    (Blocks pinned: " << probe_partition->probe_rows()->blocks_pinned() << ")"
        << endl;
-    if (partition->hash_tbl() != NULL) {
-      ss << "   Hash Table Rows: " << partition->hash_tbl()->size() << endl;
-    }
   }
 
   if (!spilled_partitions_.empty()) {
     ss << "SpilledPartitions" << endl;
-    for (list<Partition*>::const_iterator it = spilled_partitions_.begin();
-        it != spilled_partitions_.end(); ++it) {
-      DCHECK((*it)->is_spilled());
-      DCHECK((*it)->hash_tbl() == NULL);
-      DCHECK((*it)->build_rows() != NULL);
-      DCHECK((*it)->probe_rows() != NULL);
-      ss << "  Partition=" << *it << endl
-         << "   Spilled Build Rows: "
-         << (*it)->build_rows()->num_rows() << endl
-         << "   Spilled Probe Rows: "
-         << (*it)->probe_rows()->num_rows() << endl;
+    for (const unique_ptr<ProbePartition>& probe_partition : spilled_partitions_) {
+      PhjBuilder::Partition* build_partition = probe_partition->build_partition();
+      DCHECK(build_partition->is_spilled());
+      DCHECK(build_partition->hash_tbl() == NULL);
+      DCHECK(build_partition->build_rows() != NULL);
+      DCHECK(probe_partition->probe_rows() != NULL);
+      ss << "  Partition=" << probe_partition.get() << endl
+         << "   Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl
+         << "   Spilled Probe Rows: " << probe_partition->probe_rows()->num_rows()
+         << endl;
     }
   }
   if (input_partition_ != NULL) {
-    DCHECK(input_partition_->build_rows() != NULL);
+    DCHECK(input_partition_->build_partition()->build_rows() != NULL);
     DCHECK(input_partition_->probe_rows() != NULL);
-    ss << "InputPartition: " << input_partition_ << endl
+    ss << "InputPartition: " << input_partition_.get() << endl
        << "   Spilled Build Rows: "
-       << input_partition_->build_rows()->num_rows() << endl
-       << "   Spilled Probe Rows: "
-       << input_partition_->probe_rows()->num_rows() << endl;
+       << input_partition_->build_partition()->build_rows()->num_rows() << endl
+       << "   Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
   } else {
     ss << "InputPartition: NULL" << endl;
   }
@@ -1648,81 +1190,14 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen,
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
-  Function* process_build_batch_fn =
-      codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
-  DCHECK(process_build_batch_fn != NULL);
-
-  // Replace call sites
-  int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
-      "EvalBuildRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Replace some hash table parameters with constants.
-  HashTableCtx::HashTableReplacedConstants replaced_constants;
-  const bool stores_duplicates = true;
-  const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
-      num_build_tuples, process_build_batch_fn, &replaced_constants));
-  DCHECK_GE(replaced_constants.stores_nulls, 1);
-  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
-  DCHECK_EQ(replaced_constants.stores_duplicates, 0);
-  DCHECK_EQ(replaced_constants.stores_tuples, 0);
-  DCHECK_EQ(replaced_constants.quadratic_probing, 0);
-
-  Function* process_build_batch_fn_level0 =
-      codegen->CloneFunction(process_build_batch_fn);
-
-  // Always build runtime filters at level0 (if there are any).
-  // Note that the first argument of this function is the return value.
-  Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
-  build_filters_l0_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
-
-  // process_build_batch_fn_level0 uses CRC hash if available,
-  replaced = codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  // process_build_batch_fn uses murmur
-  replaced = codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Never build filters after repartitioning, as all rows have already been added to the
-  // filters during the level0 build. Note that the first argument of this function is the
-  // return value.
-  Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
-  build_filters_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
-
-  // Finalize ProcessBuildBatch functions
-  process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
-  if (process_build_batch_fn == NULL) {
-    return Status("Codegen'd PartitionedHashJoinNode::ProcessBuildBatch() function "
-        "failed verification, see log");
-  }
-  process_build_batch_fn_level0 =
-      codegen->FinalizeFunction(process_build_batch_fn_level0);
-  if (process_build_batch_fn == NULL) {
-    return Status("Codegen'd level-zero PartitionedHashJoinNode::ProcessBuildBatch() "
-        "function failed verification, see log");
-  }
-
-  // Register native function pointers
-  codegen->AddFunctionToJit(process_build_batch_fn,
-                            reinterpret_cast<void**>(&process_build_batch_fn_));
-  codegen->AddFunctionToJit(process_build_batch_fn_level0,
-                            reinterpret_cast<void**>(&process_build_batch_fn_level0_));
-  return Status::OK();
-}
-
-Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
-    RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
+Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state->GetCodegen(&codegen));
+  // Codegen for hashing rows
+  Function* hash_fn;
+  Function* murmur_hash_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, false, &hash_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
 
   // Get cross compiled function
   IRFunction::Type ir_fn = IRFunction::FN_END;
@@ -1881,65 +1356,3 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
                             reinterpret_cast<void**>(&process_probe_batch_fn_level0_));
   return Status::OK();
 }
-
-Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
-  Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
-  Function* build_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
-
-  // Replace the parameter 'prefetch_mode' with constant.
-  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
-  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
-  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
-  prefetch_mode_arg->replaceAllUsesWith(
-      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
-
-  // Use codegen'd EvalBuildRow() function
-  int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
-  DCHECK_EQ(replaced, 1);
-
-  // Use codegen'd Equals() function
-  replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
-  DCHECK_EQ(replaced, 1);
-
-  // Replace hash-table parameters with constants.
-  HashTableCtx::HashTableReplacedConstants replaced_constants;
-  const bool stores_duplicates = true;
-  const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
-      num_build_tuples, insert_batch_fn, &replaced_constants));
-  DCHECK_GE(replaced_constants.stores_nulls, 1);
-  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
-  DCHECK_GE(replaced_constants.stores_duplicates, 1);
-  DCHECK_GE(replaced_constants.stores_tuples, 1);
-  DCHECK_GE(replaced_constants.quadratic_probing, 1);
-
-  Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
-
-  // Use codegen'd hash functions
-  replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-  replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
-  DCHECK_EQ(replaced, 1);
-
-  insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
-  if (insert_batch_fn == NULL) {
-    return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
-        "InsertBatch() function failed verification, see log");
-  }
-  insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
-  if (insert_batch_fn_level0 == NULL) {
-    return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
-        "InsertBatch() function failed verification, see log");
-  }
-
-  codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
-  codegen->AddFunctionToJit(insert_batch_fn_level0,
-      reinterpret_cast<void**>(&insert_batch_fn_level0_));
-  return Status::OK();
-}


[6/6] incubator-impala git commit: IMPALA-3201: in-memory buffer pool implementation

Posted by ta...@apache.org.
IMPALA-3201: in-memory buffer pool implementation

This patch implements basic in-memory buffer management, with
reservations managed by ReservationTrackers.

Locks are fine-grained so that the buffer pool can scale to many
concurrent queries.

Includes basic tests for buffer pool setup, allocation and reservations.

Change-Id: I4bda61c31cc02d26bc83c3d458c835b0984b86a0
Reviewed-on: http://gerrit.cloudera.org:8080/4070
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/241c7e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/241c7e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/241c7e01

Branch: refs/heads/master
Commit: 241c7e01978f180012453b0a4ff6d061ca6d5093
Parents: 9cee2b5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Aug 19 17:41:45 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:38:20 2016 +0000

----------------------------------------------------------------------
 be/src/bufferpool/CMakeLists.txt      |   1 +
 be/src/bufferpool/buffer-allocator.cc |  39 +++
 be/src/bufferpool/buffer-allocator.h  |  48 ++++
 be/src/bufferpool/buffer-pool-test.cc | 381 +++++++++++++++++++++++++++--
 be/src/bufferpool/buffer-pool.cc      | 361 ++++++++++++++++++++++++++-
 be/src/bufferpool/buffer-pool.h       | 159 +++++++++---
 be/src/util/internal-queue.h          |  11 +
 common/thrift/generate_error_codes.py |   4 +-
 8 files changed, 934 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
index 69c4e4a..2f056e0 100644
--- a/be/src/bufferpool/CMakeLists.txt
+++ b/be/src/bufferpool/CMakeLists.txt
@@ -22,6 +22,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
 
 add_library(BufferPool
+  buffer-allocator.cc
   buffer-pool.cc
   reservation-tracker.cc
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.cc b/be/src/bufferpool/buffer-allocator.cc
new file mode 100644
index 0000000..27bd788
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.cc
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/buffer-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+BufferAllocator::BufferAllocator(int64_t min_buffer_len)
+  : min_buffer_len_(min_buffer_len) {}
+
+Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) {
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  *buffer = reinterpret_cast<uint8_t*>(malloc(len));
+  if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
+  return Status::OK();
+}
+
+void BufferAllocator::Free(uint8_t* buffer, int64_t len) {
+  free(buffer);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.h b/be/src/bufferpool/buffer-allocator.h
new file mode 100644
index 0000000..c3e0c70
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_BUFFER_ALLOCATOR_H
+#define IMPALA_BUFFER_ALLOCATOR_H
+
+#include "common/status.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool. All buffers are allocated through
+/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that
+/// are power-of-two multiples of the minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+/// * Implement free lists in the allocator or external to the allocator.
+class BufferAllocator {
+ public:
+  BufferAllocator(int64_t min_buffer_len);
+
+  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple
+  /// of the minimum buffer length.
+  Status Allocate(int64_t len, uint8_t** buffer);
+
+  /// Free the memory for a previously-allocated buffer.
+  void Free(uint8_t* buffer, int64_t len);
+
+ private:
+  const int64_t min_buffer_len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc
index cdb163d..d45f017 100644
--- a/be/src/bufferpool/buffer-pool-test.cc
+++ b/be/src/bufferpool/buffer-pool-test.cc
@@ -28,6 +28,7 @@
 #include "bufferpool/reservation-tracker.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "testutil/death-test-util.h"
 #include "testutil/test-macros.h"
 
 #include "common/names.h"
@@ -51,13 +52,16 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   /// The minimum buffer size used in most tests.
-  const static int64_t TEST_PAGE_LEN = 1024;
+  const static int64_t TEST_BUFFER_LEN = 1024;
 
   /// Test helper to simulate registering then deregistering a number of queries with
   /// the given initial reservation and reservation limit.
   void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
       int64_t initial_query_reservation, int64_t query_reservation_limit);
 
+  /// Create and destroy a page multiple times.
+  void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops);
+
  protected:
   static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
 
@@ -85,7 +89,7 @@ class BufferPoolTest : public ::testing::Test {
   SpinLock query_reservations_lock_;
 };
 
-const int64_t BufferPoolTest::TEST_PAGE_LEN;
+const int64_t BufferPoolTest::TEST_BUFFER_LEN;
 
 void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
     int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
@@ -156,7 +160,7 @@ TEST_F(BufferPoolTest, BasicRegistration) {
   int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_PAGE_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
 
   RegisterQueriesAndClients(
       &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
@@ -179,7 +183,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_PAGE_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
 
   // Launch threads, each with a different set of query IDs.
   thread_group workers;
@@ -195,32 +199,355 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   global_reservations_.Close();
 }
 
-/// Test that reservation setup fails if the initial buffers cannot be fulfilled.
-TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) {
-  Status status;
-  int num_queries = 128;
-  int64_t reservation_per_query = 128;
-  // Won't be able to fulfill initial reservation for last query.
-  int64_t total_mem = num_queries * reservation_per_query - 1;
-  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+/// Test basic page handle creation.
+TEST_F(BufferPoolTest, PageCreation) {
+  // Allocate many pages, each a power-of-two multiple of the minimum page length.
+  int num_pages = 16;
+  int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
+  int64_t total_mem = 2 * 2 * max_page_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  vector<BufferPool::PageHandle> handles(num_pages);
+
+  // Create pages of various valid sizes.
+  for (int i = 0; i < num_pages; ++i) {
+    int size_multiple = 1 << i;
+    int64_t page_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].is_pinned());
+    ASSERT_TRUE(handles[i].buffer_handle() != NULL);
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+    ASSERT_EQ(handles[i].len(), page_len);
+    ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+  }
 
-  for (int i = 0; i < num_queries; ++i) {
-    ReservationTracker* query_tracker = GetQueryReservationTracker(i);
-    query_tracker->InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query);
-    bool got_initial_reservation =
-        query_tracker->IncreaseReservationToFit(reservation_per_query);
-    if (i < num_queries - 1) {
-      ASSERT_TRUE(got_initial_reservation);
-    } else {
-      ASSERT_FALSE(got_initial_reservation);
-
-      // Getting the initial reservation should succeed after freeing up buffers from
-      // other query.
-      GetQueryReservationTracker(i - 1)->Close();
-      ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query));
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_pages; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int page_len = handles[i].len();
+    pool.DestroyPage(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, BufferAllocation) {
+  // Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
+  int num_buffers = 16;
+  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
+  int64_t total_mem = 2 * 2 * max_buffer_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  vector<BufferPool::BufferHandle> handles(num_buffers);
+
+  // Create buffers of various valid sizes.
+  for (int i = 0; i < num_buffers; ++i) {
+    int size_multiple = 1 << i;
+    int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].len(), buffer_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+  }
+
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_buffers; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int buffer_len = handles[i].len();
+    pool.FreeBuffer(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test transfer of buffer handles between clients.
+TEST_F(BufferPoolTest, BufferTransfer) {
+  // Each client needs to have enough reservation for a buffer.
+  const int num_clients = 5;
+  int64_t total_mem = num_clients * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker client_trackers[num_clients];
+  BufferPool::Client clients[num_clients];
+  BufferPool::BufferHandle handles[num_clients];
+  for (int i = 0; i < num_clients; ++i) {
+    client_trackers[i].InitChildTracker(
+        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
+    ASSERT_OK(pool.RegisterClient("test client", &client_trackers[i], &clients[i]));
+  }
+
+  // Transfer the page around between the clients repeatedly in a circle.
+  ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
+  uint8_t* data = handles[0].data();
+  for (int iter = 0; iter < 10; ++iter) {
+    for (int client = 0; client < num_clients; ++client) {
+      int next_client = (client + 1) % num_clients;
+      ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
+          &clients[next_client], &handles[next_client]));
+      // Check that the transfer left things in a consistent state.
+      ASSERT_FALSE(handles[client].is_open());
+      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+      ASSERT_TRUE(handles[next_client].is_open());
+      ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation());
+      // The same underlying buffer should be used.
+      ASSERT_EQ(data, handles[next_client].data());
     }
   }
+
+  pool.FreeBuffer(&clients[0], &handles[0]);
+  for (int i = 0; i < num_clients; ++i) {
+    pool.DeregisterClient(&clients[i]);
+    client_trackers[i].Close();
+  }
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test basic pinning and unpinning.
+TEST_F(BufferPoolTest, Pin) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation to pin twice.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle handle1, handle2;
+
+  // Can pin two minimum sized pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  ASSERT_TRUE(handle1.is_open());
+  ASSERT_TRUE(handle1.is_pinned());
+  ASSERT_TRUE(handle1.data() != NULL);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  ASSERT_TRUE(handle2.is_open());
+  ASSERT_TRUE(handle2.is_pinned());
+  ASSERT_TRUE(handle2.data() != NULL);
+
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(handle2.is_pinned());
+
+  // Can pin minimum-sized page twice.
+  ASSERT_OK(pool.Pin(&client, &handle1));
+  ASSERT_TRUE(handle1.is_pinned());
+  // Have to unpin twice.
+  pool.Unpin(&client, &handle1);
+  ASSERT_TRUE(handle1.is_pinned());
+  pool.Unpin(&client, &handle1);
+  ASSERT_FALSE(handle1.is_pinned());
+
+  // Can pin double-sized page only once.
+  BufferPool::PageHandle double_handle;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+  ASSERT_TRUE(double_handle.is_open());
+  ASSERT_TRUE(double_handle.is_pinned());
+  ASSERT_TRUE(double_handle.data() != NULL);
+
+  // Destroy the pages - test destroying both pinned and unpinned.
+  pool.DestroyPage(&client, &handle1);
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &double_handle);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+/// Creating a page or pinning without sufficient reservation should DCHECK.
+TEST_F(BufferPoolTest, PinWithoutReservation) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle handle;
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
+
+  // Should succeed after increasing reservation.
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
+
+  // But we can't pin again.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
+
+  pool.DestroyPage(&client, &handle);
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+TEST_F(BufferPoolTest, ExtractBuffer) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation for two buffers/pins.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle page;
+  BufferPool::BufferHandle buffer;
+
+  // Test basic buffer extraction.
+  for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
+    ASSERT_OK(pool.CreatePage(&client, len, &page));
+    uint8_t* page_data = page.data();
+    pool.ExtractBuffer(&client, &page, &buffer);
+    ASSERT_FALSE(page.is_open());
+    ASSERT_TRUE(buffer.is_open());
+    ASSERT_EQ(len, buffer.len());
+    ASSERT_EQ(page_data, buffer.data());
+    ASSERT_EQ(len, client_tracker->GetUsedReservation());
+    pool.FreeBuffer(&client, &buffer);
+    ASSERT_EQ(0, client_tracker->GetUsedReservation());
+  }
+
+  // Test that ExtractBuffer() accounts correctly for pin count > 1.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  uint8_t* page_data = page.data();
+  ASSERT_OK(pool.Pin(&client, &page));
+  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+  pool.ExtractBuffer(&client, &page, &buffer);
+  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+  ASSERT_FALSE(page.is_open());
+  ASSERT_TRUE(buffer.is_open());
+  ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
+  ASSERT_EQ(page_data, buffer.data());
+  pool.FreeBuffer(&client, &buffer);
+  ASSERT_EQ(0, client_tracker->GetUsedReservation());
+
+  // Test that ExtractBuffer() DCHECKs for unpinned pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  pool.Unpin(&client, &page);
+  IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+  pool.DestroyPage(&client, &page);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+// Test concurrent creation and destruction of pages.
+TEST_F(BufferPoolTest, ConcurrentPageCreation) {
+  int ops_per_thread = 1024;
+  int num_threads = 64;
+  // Need enough buffers for all initial reservations.
+  int total_mem = num_threads * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  // Launch threads, each with a different set of query IDs.
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
+        &global_reservations_, ops_per_thread)));
+  }
+
+  // Build debug string to test concurrent iteration over pages_ list.
+  for (int i = 0; i < 64; ++i) {
+    LOG(INFO) << pool.DebugString();
+  }
+  workers.join_all();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  global_reservations_.Close();
+}
+
+void BufferPoolTest::CreatePageLoop(
+    BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
+  ReservationTracker client_tracker;
+  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool->RegisterClient("test client", &client_tracker, &client));
+  for (int i = 0; i < num_ops; ++i) {
+    BufferPool::PageHandle handle;
+    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
+    ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
+    pool->Unpin(&client, &handle);
+    ASSERT_OK(pool->Pin(&client, &handle));
+    pool->DestroyPage(&client, &handle);
+    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
+  }
+  pool->DeregisterClient(&client);
+  client_tracker.Close();
+}
+
+/// Test error path where pool is unable to fulfill a reservation because it cannot evict
+/// unpinned pages.
+TEST_F(BufferPoolTest, CapacityExhausted) {
+  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
+  // TODO: once we enable spilling, set up buffer pool so that spilling is disabled.
+  // Set up pool with one more buffer than reservations (so that we hit the reservation
+  // limit instead of the buffer limit).
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
+
+  BufferPool::PageHandle handle1, handle2, handle3;
+
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", &global_reservations_, &client));
+  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+
+  // Do not have enough reservations because we pinned the page.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
+
+  // Even with reservations, we can only create one more unpinned page because we don't
+  // support eviction yet.
+  pool.Unpin(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
+
+  // After destroying a page, we should have a free buffer that we can use.
+  pool.DestroyPage(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
+
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &handle3);
+  pool.DeregisterClient(&client);
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
index 21893fc..c89b8f6 100644
--- a/be/src/bufferpool/buffer-pool.cc
+++ b/be/src/bufferpool/buffer-pool.cc
@@ -31,13 +31,151 @@ using strings::Substitute;
 
 namespace impala {
 
+/// The internal representation of a page, which can be pinned or unpinned. If the
+/// page is pinned, a buffer is associated with the page.
+///
+/// Code manipulating the page is responsible for acquiring 'lock' when reading or
+/// modifying the page.
+struct BufferPool::Page : public BufferPool::PageList::Node {
+  Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
+
+  /// Increment the pin count. Caller must hold 'lock'.
+  void IncrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    ++pin_count;
+    // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we
+    // have to assume they are dirty.
+    dirty = true;
+  }
+
+  /// Decrement the pin count. Caller must hold 'lock'.
+  void DecrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    DCHECK(pin_count > 0);
+    --pin_count;
+  }
+
+  string DebugString() {
+    return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this,
+        len, pin_count, buffer.DebugString(), dirty);
+  }
+
+  // Helper for BufferPool::DebugString().
+  static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
+    lock_guard<SpinLock> pl(page->lock);
+    (*ss) << page->DebugString() << "\n";
+    return true;
+  }
+
+  /// The length of the page in bytes.
+  const int64_t len;
+
+  /// Lock to protect the below members of Page. The lock must be held when modifying any
+  /// of the below members and when reading any of the below members of an unpinned page.
+  SpinLock lock;
+
+  /// The pin count of the page.
+  int pin_count;
+
+  /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned
+  /// and was evicted from memory.
+  BufferHandle buffer;
+
+  /// True if the buffer's contents need to be saved before evicting it from memory.
+  bool dirty;
+};
+
+BufferPool::BufferHandle::BufferHandle() {
+  Reset();
+}
+
+BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  client_ = src.client_;
+  data_ = src.data_;
+  len_ = src.len_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) {
+  client_ = client;
+  data_ = data;
+  len_ = len;
+}
+
+void BufferPool::BufferHandle::Reset() {
+  client_ = NULL;
+  data_ = NULL;
+  len_ = -1;
+}
+
+BufferPool::PageHandle::PageHandle() {
+  Reset();
+}
+
+BufferPool::PageHandle::PageHandle(PageHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  page_ = src.page_;
+  client_ = src.client_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::PageHandle::Open(Page* page, Client* client) {
+  DCHECK(!is_open());
+  page->lock.DCheckLocked();
+  page_ = page;
+  client_ = client;
+}
+
+void BufferPool::PageHandle::Reset() {
+  page_ = NULL;
+  client_ = NULL;
+}
+
+int BufferPool::PageHandle::pin_count() const {
+  DCHECK(is_open());
+  // The pin count can only be modified via this PageHandle, which must not be
+  // concurrently accessed by multiple threads, so it is safe to access without locking
+  return page_->pin_count;
+}
+
+int64_t BufferPool::PageHandle::len() const {
+  DCHECK(is_open());
+  // The length of the page cannot change, so it is safe to access without locking.
+  return page_->len;
+}
+
+const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+  DCHECK(is_pinned());
+  // The 'buffer' field cannot change while the page is pinned, so it is safe to access
+  // without locking.
+  return &page_->buffer;
+}
+
 BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) {
+  : allocator_(new BufferAllocator(min_buffer_len)),
+    min_buffer_len_(min_buffer_len),
+    buffer_bytes_limit_(buffer_bytes_limit),
+    buffer_bytes_remaining_(buffer_bytes_limit) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
 }
 
-BufferPool::~BufferPool() {}
+BufferPool::~BufferPool() {
+  DCHECK(pages_.empty());
+}
 
 Status BufferPool::RegisterClient(
     const string& name, ReservationTracker* reservation, Client* client) {
@@ -55,15 +193,228 @@ void BufferPool::DeregisterClient(Client* client) {
   client->reservation_ = NULL;
 }
 
+Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) {
+  DCHECK(!handle->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  BufferHandle buffer;
+  // No changes have been made to state yet, so we can cleanly return on error.
+  RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
+
+  Page* page = new Page(len);
+  {
+    lock_guard<SpinLock> pl(page->lock);
+    page->buffer = std::move(buffer);
+    handle->Open(page, client);
+    page->IncrementPinCount(handle);
+  }
+
+  // Only add to globally-visible list after page is initialized. The page lock also
+  // needs to be released before enqueueing to respect the lock ordering.
+  pages_.Enqueue(page);
+
+  client->reservation_->AllocateFrom(len);
+  return Status::OK();
+}
+
+void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
+  if (!handle->is_open()) return; // DestroyPage() should be idempotent.
+
+  Page* page = handle->page_;
+  if (handle->is_pinned()) {
+    // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
+    // of cleaning up the page and freeing the buffer.
+    BufferHandle buffer;
+    ExtractBuffer(client, handle, &buffer);
+    FreeBuffer(client, &buffer);
+    return;
+  }
+
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    // In the unpinned case, no reservation is consumed, so just free the buffer.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+    if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
+  }
+  CleanUpPage(handle);
+}
+
+void BufferPool::CleanUpPage(PageHandle* handle) {
+  // Remove the destroyed page from data structures in a way that ensures no other
+  // threads have a remaining reference. Threads that access pages via the 'pages_'
+  // list hold 'pages_.lock_', so Remove() will not return until those threads are done
+  // and it is safe to delete page.
+  pages_.Remove(handle->page_);
+  delete handle->page_;
+  handle->Reset();
+}
+
+Status BufferPool::Pin(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK(handle->is_open());
+  DCHECK_EQ(handle->client_, client);
+
+  Page* page = handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    if (!page->buffer.is_open()) {
+      // No changes have been made to state yet, so we can cleanly return on error.
+      RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer));
+    }
+    page->IncrementPinCount(handle);
+
+    // TODO: will need to initiate/wait for read if the page is not in-memory.
+  }
+
+  client->reservation_->AllocateFrom(page->len);
+  return Status::OK();
+}
+
+void BufferPool::Unpin(Client* client, PageHandle* handle) {
+  DCHECK(handle->is_open());
+  lock_guard<SpinLock> pl(handle->page_->lock);
+  UnpinLocked(client, handle);
+}
+
+void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK_EQ(handle->client_, client);
+  // If handle is pinned, we can assume that the page itself is pinned.
+  DCHECK(handle->is_pinned());
+  Page* page = handle->page_;
+  page->lock.DCheckLocked();
+
+  page->DecrementPinCount(handle);
+  client->reservation_->ReleaseTo(page->len);
+
+  // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true.
+}
+
+void BufferPool::ExtractBuffer(
+    Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
+  DCHECK(page_handle->is_pinned());
+  DCHECK_EQ(page_handle->client_, client);
+
+  Page* page = page_handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+
+    // Bring the pin count to 1 so that we're not using surplus reservations.
+    while (page->pin_count > 1) UnpinLocked(client, page_handle);
+    *buffer_handle = std::move(page->buffer);
+  }
+  CleanUpPage(page_handle);
+}
+
+Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) {
+  client->reservation_->AllocateFrom(len);
+  return AllocateBufferInternal(client, len, handle);
+}
+
+Status BufferPool::AllocateBufferInternal(
+    Client* client, int64_t len, BufferHandle* buffer) {
+  DCHECK(!buffer->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
+  if (TryDecreaseBufferBytesRemaining(len)) {
+    uint8_t* data;
+    Status status = allocator_->Allocate(len, &data);
+    if (!status.ok()) {
+      buffer_bytes_remaining_.Add(len);
+      return status;
+    }
+    DCHECK(data != NULL);
+    buffer->Open(client, data, len);
+    return Status::OK();
+  }
+
+  // If there is no remaining capacity, we must evict another page.
+  return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
+      Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is "
+                 "not implemented yet!", buffer_bytes_limit_));
+}
+
+void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
+  if (!handle->is_open()) return; // Should be idempotent.
+  DCHECK_EQ(client, handle->client_);
+  client->reservation_->ReleaseTo(handle->len_);
+  FreeBufferInternal(handle);
+}
+
+void BufferPool::FreeBufferInternal(BufferHandle* handle) {
+  DCHECK(handle->is_open());
+  allocator_->Free(handle->data(), handle->len());
+  buffer_bytes_remaining_.Add(handle->len());
+  handle->Reset();
+}
+
+Status BufferPool::TransferBuffer(
+    Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) {
+  DCHECK(src->is_open());
+  DCHECK(!dst->is_open());
+  DCHECK_EQ(src_client, src->client_);
+  DCHECK_NE(src, dst);
+  DCHECK_NE(src_client, dst_client);
+
+  dst_client->reservation_->AllocateFrom(src->len());
+  src_client->reservation_->ReleaseTo(src->len());
+  *dst = std::move(*src);
+  dst->client_ = dst_client;
+  return Status::OK();
+}
+
+bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
+  // TODO: we may want to change this policy so that we don't always use up to the limit
+  // for buffers, since this may starve other operators using non-buffer-pool memory.
+  while (true) {
+    int64_t old_value = buffer_bytes_remaining_.Load();
+    if (old_value < len) return false;
+    int64_t new_value = old_value - len;
+    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+      return true;
+    }
+  }
+}
+
 string BufferPool::Client::DebugString() const {
-  return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_,
-      reservation_->DebugString());
+  if (is_registered()) {
+    return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_,
+        reservation_->DebugString());
+  } else {
+    return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
+  }
+}
+
+string BufferPool::PageHandle::DebugString() const {
+  if (is_open()) {
+    lock_guard<SpinLock> pl(page_->lock);
+    return Substitute(
+        "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
+        this, client_->DebugString(), page_->DebugString());
+  } else {
+    return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
+  }
+}
+
+string BufferPool::BufferHandle::DebugString() const {
+  if (is_open()) {
+    return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this,
+        client_->DebugString(), data_, len_);
+  } else {
+    return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
+  }
 }
 
 string BufferPool::DebugString() {
   stringstream ss;
   ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
-     << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n";
+     << " buffer_bytes_limit: " << buffer_bytes_limit_
+     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
+  pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
   return ss.str();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
index e566543..44b5574 100644
--- a/be/src/bufferpool/buffer-pool.h
+++ b/be/src/bufferpool/buffer-pool.h
@@ -18,10 +18,12 @@
 #ifndef IMPALA_BUFFER_POOL_H
 #define IMPALA_BUFFER_POOL_H
 
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 #include <string>
-#include <stdint.h>
 
+#include "bufferpool/buffer-allocator.h"
 #include "common/atomic.h"
 #include "common/status.h"
 #include "gutil/macros.h"
@@ -30,6 +32,7 @@
 
 namespace impala {
 
+class BufferAllocator;
 class ReservationTracker;
 
 /// A buffer pool that manages memory buffers for all queries in an Impala daemon.
@@ -69,7 +72,7 @@ class ReservationTracker;
 /// Buffer/Page Sizes
 /// =================
 /// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and
-/// buffer sizes must be an exact multiple of the minimum buffer size.
+/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size.
 ///
 /// Reservations
 /// ============
@@ -140,11 +143,18 @@ class ReservationTracker;
 /// +========================+
 /// | IMPLEMENTATION DETAILS |
 /// +========================+
-/// ... TODO ...
+///
+/// Lock Ordering
+/// =============
+/// The lock ordering is:
+/// * pages_::lock_ -> Page::lock_
+///
+/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held
+/// until done with the page to ensure the page isn't concurrently deleted.
 class BufferPool {
  public:
-  class Client;
   class BufferHandle;
+  class Client;
   class PageHandle;
 
   /// Constructs a new buffer pool.
@@ -198,9 +208,10 @@ class BufferPool {
 
   /// Extracts buffer from a pinned page. After this returns, the page referenced by
   /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
-  /// 'page_handle'. This may decrease reservation usage if the page was pinned multiple
-  /// times via 'page_handle'.
-  void ExtractBuffer(PageHandle* page_handle, BufferHandle* buffer_handle);
+  /// 'page_handle'. This may decrease reservation usage of 'client' if the page was
+  /// pinned multiple times via 'page_handle'.
+  void ExtractBuffer(
+      Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
   /// is responsible for ensuring it has enough unused reservation before calling
@@ -214,9 +225,9 @@ class BufferPool {
 
   /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the
   /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and
-  /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must
-  /// be closed
-  /// before calling. After a successful call, 'src' is closed and 'dst' is open.
+  /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be
+  /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different.
+  /// After a successful call, 'src' is closed and 'dst' is open.
   Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client,
       BufferHandle* dst);
 
@@ -228,13 +239,49 @@ class BufferPool {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  struct Page;
+
+  /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held
+  /// by the caller.
+  void UnpinLocked(Client* client, PageHandle* handle);
+
+  /// Perform the cleanup of the page object and handle when the page is destroyed.
+  /// Reset 'handle', free the Page object and remove the 'pages_' entry.
+  /// The 'handle->page_' lock should *not* be held by the caller.
+  void CleanUpPage(PageHandle* handle);
 
-  /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of
-  /// this length. This is always a power of two.
+  /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
+  /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the
+  /// reservation.
+  Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer);
+
+  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
+  /// internal state but does not release to any reservation.
+  void FreeBufferInternal(BufferHandle* buffer);
+
+  /// Check if we can allocate another buffer of size 'len' bytes without
+  /// 'buffer_bytes_remaining_' going negative.
+  /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful.
+  bool TryDecreaseBufferBytesRemaining(int64_t len);
+
+  /// Allocator for allocating and freeing all buffer memory.
+  boost::scoped_ptr<BufferAllocator> allocator_;
+
+  /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two
+  /// multiple of this length. This is always a power of two.
   const int64_t min_buffer_len_;
 
   /// The maximum physical memory in bytes that can be used for buffers.
   const int64_t buffer_bytes_limit_;
+
+  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for
+  /// allocating new buffers. Must be updated atomically before a new buffer is
+  /// allocated or after an existing buffer is freed.
+  AtomicInt64 buffer_bytes_remaining_;
+
+  /// List containing all pages. Protected by the list's internal lock.
+  typedef InternalQueue<Page> PageList;
+  PageList pages_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -266,6 +313,54 @@ class BufferPool::Client {
   ReservationTracker* reservation_;
 };
 
+/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
+/// be used by a single thread at a time: concurrently calling BufferHandle methods or
+/// BufferPool methods with the BufferHandle as an argument is not supported.
+class BufferPool::BufferHandle {
+ public:
+  BufferHandle();
+  ~BufferHandle() { DCHECK(!is_open()); }
+
+  /// Allow move construction of handles, to support std::move().
+  BufferHandle(BufferHandle&& src);
+
+  /// Allow move assignment of handles, to support STL classes like std::vector.
+  /// Destination must be uninitialized.
+  BufferHandle& operator=(BufferHandle&& src);
+
+  bool is_open() const { return data_ != NULL; }
+  int64_t len() const {
+    DCHECK(is_open());
+    return len_;
+  }
+  /// Get a pointer to the start of the buffer.
+  uint8_t* data() const {
+    DCHECK(is_open());
+    return data_;
+  }
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+  friend class BufferPool;
+
+  /// Internal helper to set the handle to an opened state.
+  void Open(const Client* client, uint8_t* data, int64_t len);
+
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
+
+  /// The client the buffer handle belongs to, used to validate that the correct client
+  /// is provided in BufferPool method calls.
+  const Client* client_;
+
+  /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
+  uint8_t* data_;
+
+  /// Length of the buffer in bytes.
+  int64_t len_;
+};
 
 /// The handle for a page used by clients of the BufferPool. Each PageHandle should
 /// only be used by a single thread at a time: concurrently calling PageHandle methods
@@ -282,12 +377,13 @@ class BufferPool::PageHandle {
   // Destination must be closed.
   PageHandle& operator=(PageHandle&& src);
 
-  bool is_open() const;
-  bool is_pinned() const;
+  bool is_open() const { return page_ != NULL; }
+  bool is_pinned() const { return pin_count() > 0; }
+  int pin_count() const;
   int64_t len() const;
   /// Get a pointer to the start of the page's buffer. Only valid to call if the page
   /// is pinned via this handle.
-  uint8_t* data() const;
+  uint8_t* data() const { return buffer_handle()->data(); }
 
   /// Return a pointer to the page's buffer handle. Only valid to call if the page is
   /// pinned via this handle. Only const accessors of the returned handle can be used:
@@ -299,32 +395,21 @@ class BufferPool::PageHandle {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(PageHandle);
-};
-
-/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
-/// be used by a single thread at a time: concurrently calling BufferHandle methods or
-/// BufferPool methods with the BufferHandle as an argument is not supported.
-class BufferPool::BufferHandle {
- public:
-  BufferHandle();
-  ~BufferHandle() { DCHECK(!is_open()); }
+  friend class BufferPool;
+  friend class Page;
 
-  /// Allow move construction of handles, to support std::move().
-  BufferHandle(BufferHandle&& src);
+  /// Internal helper to open the handle for the given page.
+  void Open(Page* page, Client* client);
 
-  /// Allow move assignment of handles, to support STL classes like std::vector.
-  /// Destination must be uninitialized.
-  BufferHandle& operator=(BufferHandle&& src);
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
 
-  bool is_open() const;
-  int64_t len() const;
-  /// Get a pointer to the start of the buffer.
-  uint8_t* data() const;
+  /// The internal page structure. NULL if the handle is not open.
+  Page* page_;
 
-  std::string DebugString() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+  /// The client the page handle belongs to, used to validate that the correct client
+  /// is being used.
+  const Client* client_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/util/internal-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 08365d7..37a9a0c 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_UTIL_INTERNAL_QUEUE_H
 #define IMPALA_UTIL_INTERNAL_QUEUE_H
 
+#include <boost/function.hpp>
 #include <boost/thread/locks.hpp>
 
 #include "util/spinlock.h"
@@ -231,6 +232,16 @@ class InternalQueue {
     return true;
   }
 
+  // Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns
+  // false, terminate iteration. It is invalid to call other InternalQueue methods
+  // from 'fn'.
+  void Iterate(boost::function<bool(T*)> fn) {
+    boost::lock_guard<SpinLock> lock(lock_);
+    for (Node* current = head_; current != NULL; current = current->next) {
+      if (!fn(reinterpret_cast<T*>(current))) return;
+    }
+  }
+
   /// Prints the queue ptrs to a string.
   std::string DebugString() {
     std::stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 3d48005..2554a18 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -283,7 +283,9 @@ error_codes = (
    "supported length of 2147483647 bytes."),
 
   ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
-   "while spilling data to disk.")
+   "while spilling data to disk."),
+
+  ("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."),
 )
 
 import sys


[4/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

Posted by ta...@apache.org.
IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

The main outcome of this patch is to split out PhjBuilder from
PartitionedHashJoinNode, which manages the build partitions for the
join and implements the DataSink interface.

A lot of this work is fairly mechanical refactoring: dividing the state
between the two classes and duplicating it where appropriate. One major
change is that probe partitions need to be separated from build
partitions.

This required some significant algorithmic changes to memory management
for probe partition buffers: memory management for probe partitions was
entangled with the build partitions: small buffers were allocated for
the probe partitions even for non-spilling joins and the join could
spill additional partitions during probing if the probe partitions needed
to switch from small to I/O buffers. The changes made were:
- Probe partitions are only initialized after the build is partitioned, and
  only for spilled build partitions.
- Probe partitions never use small buffers: once the initial write
  buffer is allocated, appending to the probe partition never fails.
- All probe partition allocation is done after partitioning the build
  and before processing the probe input during the same phase as hash
  table building. (Aside from NAAJ partitions which are allocated
  upfront).

The probe partition changes necessitated a change in
BufferedTupleStream: allocation of write blocks is now explicit via the
PrepareForWrite() API.

Testing:
Ran exhaustive build and local stress test.

Memory Usage:
Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20.
No regressions on TPC-DS. TPC-H either stayed the same or improved in
min memory requirement without spilling, but the min memory requirement
with spilling regressed in some cases. I investigated each of the
significant regressions on TPC-H and determined that they were all due
to exec nodes racing for spillable or non-spillable memory. None of them
were cases where exec nodes got their minimum reservation and failed to
execute the spilling algorithm correctly.

Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb
Reviewed-on: http://gerrit.cloudera.org:8080/3873
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1ccd83b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1ccd83b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1ccd83b2

Branch: refs/heads/master
Commit: 1ccd83b2d0cf60c3b711a5d9bd758438ab846150
Parents: b7d107a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jun 2 17:48:15 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:07:37 2016 +0000

----------------------------------------------------------------------
 .clang-format                                   |    3 +-
 be/src/codegen/gen_ir_descriptions.py           |    4 +-
 be/src/codegen/impala-ir.cc                     |    1 +
 be/src/exec/CMakeLists.txt                      |    2 +
 be/src/exec/analytic-eval-node.cc               |   15 +-
 be/src/exec/blocking-join-node.cc               |    4 +-
 be/src/exec/blocking-join-node.h                |    4 +-
 be/src/exec/data-sink.cc                        |   16 +-
 be/src/exec/hash-join-node.cc                   |    2 +-
 be/src/exec/nested-loop-join-builder.cc         |    3 +-
 be/src/exec/nested-loop-join-builder.h          |   14 +-
 be/src/exec/nested-loop-join-node.cc            |    6 +-
 be/src/exec/partitioned-aggregation-node.cc     |   34 +-
 be/src/exec/partitioned-hash-join-builder-ir.cc |  110 ++
 be/src/exec/partitioned-hash-join-builder.cc    |  900 ++++++++++++
 be/src/exec/partitioned-hash-join-builder.h     |  460 ++++++
 be/src/exec/partitioned-hash-join-node-ir.cc    |  135 +-
 be/src/exec/partitioned-hash-join-node.cc       | 1349 +++++-------------
 be/src/exec/partitioned-hash-join-node.h        |  513 +++----
 be/src/exec/partitioned-hash-join-node.inline.h |    6 -
 be/src/runtime/buffered-block-mgr.cc            |    9 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |   29 +-
 be/src/runtime/buffered-tuple-stream.cc         |   71 +-
 be/src/runtime/buffered-tuple-stream.h          |   24 +-
 be/src/util/runtime-profile.cc                  |   37 +-
 be/src/util/runtime-profile.h                   |   13 +-
 tests/stress/concurrent_select.py               |    6 +-
 27 files changed, 2272 insertions(+), 1498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/.clang-format
----------------------------------------------------------------------
diff --git a/.clang-format b/.clang-format
index 2de4b96..c0b9030 100644
--- a/.clang-format
+++ b/.clang-format
@@ -1,6 +1,7 @@
 BasedOnStyle: Google
 AlignAfterOpenBracket: 'false'
 AlignOperands: 'false'
+AllowShortCaseLabelsOnASingleLine: 'true'
 AllowShortFunctionsOnASingleLine: 'Inline'
 AllowShortIfStatementsOnASingleLine: 'true'
 BreakBeforeBinaryOperators: 'NonAssignment'
@@ -11,4 +12,4 @@ ContinuationIndentWidth: '4'
 DerivePointerAlignment: 'false'
 PenaltyBreakBeforeFirstCallParameter: '99999999'
 SpacesBeforeTrailingComments: '1'
-Standard: 'Cpp11'
\ No newline at end of file
+Standard: 'Cpp11'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index a12d73d..60244a7 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -81,7 +81,7 @@ ir_functions = [
   ["HASH_MURMUR", "IrMurmurHash"],
   ["HASH_JOIN_PROCESS_BUILD_BATCH", "12HashJoinNode17ProcessBuildBatch"],
   ["HASH_JOIN_PROCESS_PROBE_BATCH", "12HashJoinNode17ProcessProbeBatch"],
-  ["PHJ_PROCESS_BUILD_BATCH", "23PartitionedHashJoinNode17ProcessBuildBatch"],
+  ["PHJ_PROCESS_BUILD_BATCH", "10PhjBuilder17ProcessBuildBatch"],
   ["PHJ_PROCESS_PROBE_BATCH_INNER_JOIN", "ProcessProbeBatchILi0"],
   ["PHJ_PROCESS_PROBE_BATCH_LEFT_OUTER_JOIN", "ProcessProbeBatchILi1"],
   ["PHJ_PROCESS_PROBE_BATCH_LEFT_SEMI_JOIN", "ProcessProbeBatchILi2"],
@@ -91,7 +91,7 @@ ir_functions = [
   ["PHJ_PROCESS_PROBE_BATCH_RIGHT_SEMI_JOIN", "ProcessProbeBatchILi6"],
   ["PHJ_PROCESS_PROBE_BATCH_RIGHT_ANTI_JOIN", "ProcessProbeBatchILi7"],
   ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN", "ProcessProbeBatchILi8"],
-  ["PHJ_INSERT_BATCH", "9Partition11InsertBatch"],
+  ["PHJ_INSERT_BATCH", "10PhjBuilder9Partition11InsertBatch"],
   ["HASH_TABLE_GET_HASH_SEED", "GetHashSeed"],
   ["HASH_TABLE_GET_BUILD_EXPR_CTX", "HashTableCtx15GetBuildExprCtx"],
   ["HASH_TABLE_GET_PROBE_EXPR_CTX", "HashTableCtx15GetProbeExprCtx"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 185d6c2..9b14058 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -29,6 +29,7 @@
 #include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
 #include "exec/partitioned-aggregation-node-ir.cc"
+#include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"
 #include "exec/topn-node-ir.cc"
 #include "exprs/aggregate-functions-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 8f12656..25f2202 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -73,6 +73,8 @@ add_library(Exec
   parquet-metadata-utils.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc
+  partitioned-hash-join-builder.cc
+  partitioned-hash-join-builder-ir.cc
   partitioned-hash-join-node.cc
   partitioned-hash-join-node-ir.cc
   kudu-scanner.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index c9e35d8..21a0805 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -32,11 +32,6 @@
 
 static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
 
-const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
-    "Failed to acquire initial read buffer for analytic function evaluation. Reducing "
-    "query concurrency or increasing the memory limit may help this query to complete "
-    "successfully.";
-
 using namespace strings;
 
 namespace impala {
@@ -200,10 +195,15 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
       state->block_mgr(), client_, false /* use_initial_small_buffers */,
       true /* read_write */);
   RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
+  bool got_write_buffer;
+  RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer));
+  if (!got_write_buffer) {
+    return state->block_mgr()->MemLimitTooLowError(client_, id());
+  }
   bool got_read_buffer;
   RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    return mem_tracker()->MemLimitExceeded(state, PREPARE_FOR_READ_FAILED_ERROR_MSG);
+    return state->block_mgr()->MemLimitTooLowError(client_, id());
   }
 
   DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
@@ -367,7 +367,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     // the stream and continue writing/reading in unpinned mode.
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
-    RETURN_IF_ERROR(input_stream_->UnpinStream());
+    RETURN_IF_ERROR(
+        input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 309bde4..b114451 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -174,8 +174,8 @@ Status BlockingJoinNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state,
-    DataSink* build_sink) {
+Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
+    RuntimeState* state, DataSink* build_sink) {
   // If this node is not inside a subplan and can get a thread token, initiate the
   // construction of the build-side table in a separate thread, so that the left child
   // can do any initialisation in parallel. Otherwise, do this in the main thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 14b1c72..4000b22 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -104,7 +104,7 @@ class BlockingJoinNode : public ExecNode {
   MonotonicStopWatch built_probe_overlap_stop_watch_;
 
   /// Processes the build-side input.
-  /// Called from ConstructBuildAndOpenProbe() if the subclass does not provide a
+  /// Called from ProcessBuildInputAndOpenProbe() if the subclass does not provide a
   /// DataSink to consume the build input.
   /// Note that this can be called concurrently with Open'ing the left child to
   /// increase parallelism. If, for example, the left child is another join node,
@@ -116,7 +116,7 @@ class BlockingJoinNode : public ExecNode {
   /// if the plan shape and thread token availability permit it.
   /// If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'. Otherwise
   /// calls ProcessBuildInput on the subclass.
-  Status ConstructBuildAndOpenProbe(RuntimeState* state, DataSink* build_sink);
+  Status ProcessBuildInputAndOpenProbe(RuntimeState* state, DataSink* build_sink);
 
   /// Helper function to process the build input by sending it to a DataSink.
   /// ASYNC_BUILD enables timers that impose some overhead but are required if the build

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 8c8b2dc..b6ec0ee 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -22,25 +22,30 @@
 
 #include "common/logging.h"
 #include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
 #include "exec/hbase-table-sink.h"
+#include "exec/hdfs-table-sink.h"
 #include "exec/kudu-table-sink.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/strings/substitute.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
 #include "common/names.h"
 
+using strings::Substitute;
+
 namespace impala {
 
 DataSink::DataSink(const RowDescriptor& row_desc) :
     closed_(false), row_desc_(row_desc), mem_tracker_(NULL) {}
 
-DataSink::~DataSink() {}
+DataSink::~DataSink() {
+  DCHECK(closed_);
+}
 
 Status DataSink::CreateDataSink(ObjectPool* pool,
     const TDataSink& thrift_sink, const vector<TExpr>& output_exprs,
@@ -153,15 +158,18 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
   DCHECK(mem_tracker != NULL);
   profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
   mem_tracker_ = mem_tracker;
-  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker, false));
+  expr_mem_tracker_.reset(
+      new MemTracker(-1, Substitute("$0 Exprs", GetName()), mem_tracker, false));
   return Status::OK();
 }
 
 void DataSink::Close(RuntimeState* state) {
+  if (closed_) return;
   if (expr_mem_tracker_ != NULL) {
     expr_mem_tracker_->UnregisterFromParent();
     expr_mem_tracker_.reset();
   }
+  closed_ = true;
 }
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 52ba1d1..9106159 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -205,7 +205,7 @@ Status HashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
-  RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL));
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, NULL));
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   InitGetNext();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 4315b93..58610e7 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -32,8 +32,7 @@ NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state,
     : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size(),
       mem_tracker) {}
 
-Status NljBuilder::Prepare(RuntimeState* state,
-    MemTracker* mem_tracker) {
+Status NljBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index a64315a..3fb5348 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -39,13 +39,13 @@ class NljBuilder : public DataSink {
  public:
   NljBuilder(const RowDescriptor& row_desc, RuntimeState* state, MemTracker* mem_tracker);
 
-  /// Implementations of DataSink methods.
-  virtual std::string GetName() { return "Nested Loop Join Build"; }
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
-  virtual Status Open(RuntimeState* state);
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
-  virtual Status FlushFinal(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
+  /// Implementations of DataSink interface methods.
+  virtual std::string GetName() override { return "Nested Loop Join Builder"; }
+  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status Send(RuntimeState* state, RowBatch* batch) override;
+  virtual Status FlushFinal(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
 
   /// Reset the builder to the same state as it was in after calling Open().
   void Reset();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 87fbf62..0a115c6 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -81,7 +81,7 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
     build_batches_ = builder_->input_build_batches();
   } else {
     RETURN_IF_ERROR(
-        BlockingJoinNode::ConstructBuildAndOpenProbe(state, builder_.get()));
+        BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
     build_batches_ = builder_->GetFinalBuildBatches();
     if (matching_build_rows_ != NULL) {
       RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows()));
@@ -102,9 +102,9 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Prepare(
       join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
 
-  builder_.reset(
-      new NljBuilder(child(1)->row_desc(), state, mem_tracker()));
+  builder_.reset(new NljBuilder(child(1)->row_desc(), state, mem_tracker()));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+  runtime_profile()->PrependChild(builder_->profile());
 
   // For some join modes we need to record the build rows with matches in a bitmap.
   if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::RIGHT_SEMI_JOIN ||

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index ba2d9f7..f926725 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -285,6 +285,11 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
         state->block_mgr(), block_mgr_client_, false /* use_initial_small_buffers */,
         false /* read_write */));
     RETURN_IF_ERROR(serialize_stream_->Init(id(), runtime_profile(), false));
+    bool got_buffer;
+    RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
+    if (!got_buffer) {
+      return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
+    }
     DCHECK(serialize_stream_->has_write_block());
   }
 
@@ -755,6 +760,12 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
       false /* read_write */, external_varlen_slots));
   RETURN_IF_ERROR(
       aggregated_row_stream->Init(parent->id(), parent->runtime_profile(), true));
+  bool got_buffer;
+  RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return parent->state_->block_mgr()->MemLimitTooLowError(
+        parent->block_mgr_client_, parent->id());
+  }
 
   if (!parent->is_streaming_preagg_) {
     unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
@@ -764,6 +775,12 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
     // This stream is only used to spill, no need to ever have this pinned.
     RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), parent->runtime_profile(),
         false));
+    // TODO: allocate this buffer later only if we spill the partition.
+    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    if (!got_buffer) {
+      return parent->state_->block_mgr()->MemLimitTooLowError(
+          parent->block_mgr_client_, parent->id());
+    }
     DCHECK(unaggregated_row_stream->has_write_block());
   }
   return Status::OK();
@@ -844,6 +861,14 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
         false /* read_write */));
     status = parent->serialize_stream_->Init(parent->id(), parent->runtime_profile(),
         false);
+    if (status.ok()) {
+      bool got_buffer;
+      status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
+      if (status.ok() && !got_buffer) {
+        status = parent->state_->block_mgr()->MemLimitTooLowError(
+            parent->block_mgr_client_, parent->id());
+      }
+    }
     if (!status.ok()) {
       hash_tbl->Close();
       hash_tbl.reset();
@@ -887,7 +912,8 @@ Status PartitionedAggregationNode::Partition::Spill() {
   // TODO: when not repartitioning, don't leave the write block pinned.
   DCHECK(!got_buffer || aggregated_row_stream->has_write_block())
       << aggregated_row_stream->DebugString();
-  RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(false));
+  RETURN_IF_ERROR(
+      aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   if (got_buffer && unaggregated_row_stream->using_small_buffers()) {
     RETURN_IF_ERROR(unaggregated_row_stream->SwitchToIoBuffers(&got_buffer));
@@ -1372,8 +1398,10 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
       // TODO: we only need to do this when we have memory pressure. This might be
       // okay though since the block mgr should only write these to disk if there
       // is memory pressure.
-      RETURN_IF_ERROR(partition->aggregated_row_stream->UnpinStream(true));
-      RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(true));
+      RETURN_IF_ERROR(
+          partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(
+          BufferedTupleStream::UNPIN_ALL));
 
       // Push new created partitions at the front. This means a depth first walk
       // (more finely partitioned partitions are processed first). This allows us

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
new file mode 100644
index 0000000..21fd9e4
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned-hash-join-builder.h"
+
+#include "codegen/impala-ir.h"
+#include "exec/hash-table.inline.h"
+#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-filter.h"
+#include "util/bloom-filter.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* row) {
+  Status status;
+  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  RETURN_IF_ERROR(status);
+  return AppendRowStreamFull(stream, row);
+}
+
+Status PhjBuilder::ProcessBuildBatch(
+    RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache();
+  expr_vals_cache->Reset();
+  FOREACH_ROW(build_batch, 0, build_batch_iter) {
+    TupleRow* build_row = build_batch_iter.Get();
+    if (!ctx->EvalAndHashBuild(build_row)) {
+      if (null_aware_partition_ != NULL) {
+        // TODO: remove with codegen/template
+        // If we are NULL aware and this build row has NULL in the eq join slot,
+        // append it to the null_aware partition. We will need it later.
+        RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), build_row));
+      }
+      continue;
+    }
+    if (build_filters) {
+      DCHECK_EQ(ctx->level(), 0)
+          << "Runtime filters should not be built during repartitioning.";
+      for (const FilterContext& ctx : filters_) {
+        // TODO: codegen expr evaluation and hashing
+        if (ctx.local_bloom_filter == NULL) continue;
+        void* e = ctx.expr->GetValue(build_row);
+        uint32_t filter_hash = RawValue::GetHashValue(
+            e, ctx.expr->root()->type(), RuntimeFilterBank::DefaultHashSeed());
+        ctx.local_bloom_filter->Insert(filter_hash);
+      }
+    }
+    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    Partition* partition = hash_partitions_[partition_idx];
+    RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row));
+  }
+  return Status::OK();
+}
+
+bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
+    HashTableCtx* ht_ctx, RowBatch* batch,
+    const vector<BufferedTupleStream::RowIdx>& indices) {
+  // Compute the hash values and prefetch the hash table buckets.
+  const int num_rows = batch->num_rows();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  const BufferedTupleStream::RowIdx* row_indices = indices.data();
+  for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
+       prefetch_group_row += prefetch_size) {
+    int cur_row = prefetch_group_row;
+    expr_vals_cache->Reset();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
+        if (prefetch_mode != TPrefetchMode::NONE) {
+          hash_tbl_->PrefetchBucket<false>(expr_vals_cache->CurExprValuesHash());
+        }
+      } else {
+        expr_vals_cache->SetRowNull();
+      }
+      expr_vals_cache->NextRow();
+    }
+    // Do the insertion.
+    expr_vals_cache->ResetForRead();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      TupleRow* row = batch_iter.Get();
+      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+      if (!expr_vals_cache->IsRowNull()
+          && UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
+        return false;
+      }
+      expr_vals_cache->NextRow();
+      ++cur_row;
+    }
+  }
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
new file mode 100644
index 0000000..b17bbff
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -0,0 +1,900 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned-hash-join-builder.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/hash-table.inline.h"
+#include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-state.h"
+#include "util/bloom-filter.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read "
+    "buffer for stream in hash join node $0. Reducing query concurrency or increasing "
+    "the memory limit may help this query to complete successfully.";
+
+using namespace impala;
+using namespace llvm;
+using namespace strings;
+using std::unique_ptr;
+
+PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
+    const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc,
+    RuntimeState* state)
+  : DataSink(build_row_desc),
+    runtime_state_(state),
+    join_node_id_(join_node_id),
+    join_op_(join_op),
+    probe_row_desc_(probe_row_desc),
+    block_mgr_client_(NULL),
+    non_empty_build_(false),
+    partitions_created_(NULL),
+    largest_partition_percent_(NULL),
+    max_partition_level_(NULL),
+    num_build_rows_partitioned_(NULL),
+    num_hash_collisions_(NULL),
+    num_hash_buckets_(NULL),
+    num_spilled_partitions_(NULL),
+    num_repartitions_(NULL),
+    partition_build_rows_timer_(NULL),
+    build_hash_table_timer_(NULL),
+    repartition_timer_(NULL),
+    null_aware_partition_(NULL),
+    process_build_batch_fn_(NULL),
+    process_build_batch_fn_level0_(NULL),
+    insert_batch_fn_(NULL),
+    insert_batch_fn_level0_(NULL) {}
+
+Status PhjBuilder::Init(RuntimeState* state,
+    const vector<TEqJoinCondition>& eq_join_conjuncts,
+    const vector<TRuntimeFilterDesc>& filters) {
+  for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
+    ExprContext* ctx;
+    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, eq_join_conjunct.right, &ctx));
+    build_expr_ctxs_.push_back(ctx);
+    is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
+  }
+
+  for (const TRuntimeFilterDesc& filter : filters) {
+    // If filter propagation not enabled, only consider building broadcast joins (that may
+    // be consumed by this fragment).
+    if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL
+        && !filter.is_broadcast_join) {
+      continue;
+    }
+    if (state->query_options().disable_row_runtime_filtering
+        && !filter.applied_on_partition_columns) {
+      continue;
+    }
+    FilterContext filter_ctx;
+    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
+    RETURN_IF_ERROR(Expr::CreateExprTree(&pool_, filter.src_expr, &filter_ctx.expr));
+    filters_.push_back(filter_ctx);
+  }
+  return Status::OK();
+}
+
+string PhjBuilder::GetName() {
+  return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
+}
+
+Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+  RETURN_IF_ERROR(
+      Expr::Prepare(build_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get()));
+  expr_ctxs_to_free_.insert(
+      expr_ctxs_to_free_.end(), build_expr_ctxs_.begin(), build_expr_ctxs_.end());
+
+  for (const FilterContext& ctx : filters_) {
+    RETURN_IF_ERROR(ctx.expr->Prepare(state, row_desc_, expr_mem_tracker_.get()));
+    expr_ctxs_to_free_.push_back(ctx.expr);
+  }
+  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, build_expr_ctxs_,
+      HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(),
+      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_, &ht_ctx_));
+  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
+      Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
+      MinRequiredBuffers(), true, mem_tracker, state, &block_mgr_client_));
+
+  partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
+  largest_partition_percent_ =
+      profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
+  max_partition_level_ =
+      profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT);
+  num_build_rows_partitioned_ =
+      ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT);
+  num_hash_collisions_ = ADD_COUNTER(profile(), "HashCollisions", TUnit::UNIT);
+  num_hash_buckets_ = ADD_COUNTER(profile(), "HashBuckets", TUnit::UNIT);
+  num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT);
+  num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT);
+  partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
+  build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
+  repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
+
+  Codegen(state);
+  return Status::OK();
+}
+
+Status PhjBuilder::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
+  for (const FilterContext& filter : filters_) RETURN_IF_ERROR(filter.expr->Open(state));
+  RETURN_IF_ERROR(CreateHashPartitions(0));
+  AllocateRuntimeFilters();
+
+  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    RETURN_IF_ERROR(CreateAndPreparePartition(0, &null_aware_partition_));
+  }
+  return Status::OK();
+}
+
+Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(partition_build_rows_timer_);
+  bool build_filters = ht_ctx_->level() == 0 && filters_.size() > 0;
+  if (process_build_batch_fn_ == NULL) {
+    RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters));
+  } else {
+    DCHECK(process_build_batch_fn_level0_ != NULL);
+    if (ht_ctx_->level() == 0) {
+      RETURN_IF_ERROR(
+          process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters));
+    } else {
+      RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters));
+    }
+  }
+
+  // Free any local allocations made during partitioning.
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
+  return Status::OK();
+}
+
+Status PhjBuilder::FlushFinal(RuntimeState* state) {
+  int64_t num_build_rows = 0;
+  for (Partition* partition : hash_partitions_) {
+    num_build_rows += partition->build_rows()->num_rows();
+  }
+
+  if (num_build_rows > 0) {
+    double largest_fraction = 0.0;
+    for (Partition* partition : hash_partitions_) {
+      largest_fraction = max(largest_fraction,
+          partition->build_rows()->num_rows() / static_cast<double>(num_build_rows));
+    }
+    COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(largest_fraction * 100));
+  }
+
+  if (VLOG_IS_ON(2)) {
+    stringstream ss;
+    ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_,
+        hash_partitions_[0]->level(), num_build_rows);
+    for (int i = 0; i < hash_partitions_.size(); ++i) {
+      Partition* partition = hash_partitions_[i];
+      double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows()
+              * 100 / static_cast<double>(num_build_rows);
+      ss << "  " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
+         << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
+         << "    #rows:" << partition->build_rows()->num_rows() << endl;
+    }
+    VLOG(2) << ss.str();
+  }
+
+  if (ht_ctx_->level() == 0) {
+    PublishRuntimeFilters(num_build_rows);
+    non_empty_build_ |= (num_build_rows > 0);
+  }
+
+  RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams());
+  return Status::OK();
+}
+
+void PhjBuilder::Close(RuntimeState* state) {
+  if (closed_) return;
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  CloseAndDeletePartitions();
+  if (ht_ctx_ != NULL) ht_ctx_->Close();
+  Expr::Close(build_expr_ctxs_, state);
+  for (const FilterContext& ctx : filters_) ctx.expr->Close(state);
+  if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_);
+  pool_.Clear();
+  DataSink::Close(state);
+  closed_ = true;
+}
+
+void PhjBuilder::Reset() {
+  ExprContext::FreeLocalAllocations(expr_ctxs_to_free_);
+  non_empty_build_ = false;
+  CloseAndDeletePartitions();
+}
+
+Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
+  all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
+  *partition = all_partitions_.back().get();
+  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), true));
+  bool got_buffer;
+  RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
+  if (!got_buffer) {
+    return runtime_state_->block_mgr()->MemLimitTooLowError(
+        block_mgr_client_, join_node_id_);
+  }
+  return Status::OK();
+}
+
+Status PhjBuilder::CreateHashPartitions(int level) {
+  DCHECK(hash_partitions_.empty());
+  ht_ctx_->set_level(level); // Set the hash function for partitioning input.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* new_partition;
+    RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition));
+    hash_partitions_.push_back(new_partition);
+  }
+  COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
+  COUNTER_SET(max_partition_level_, level);
+  return Status::OK();
+}
+
+Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row) {
+  Status status;
+  while (true) {
+    // Check if the stream is still using small buffers and try to switch to IO-buffers.
+    if (stream->using_small_buffers()) {
+      bool got_buffer;
+      RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
+      if (got_buffer) {
+        if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+        RETURN_IF_ERROR(status);
+      }
+    }
+    // We ran out of memory. Pick a partition to spill. If we ran out of unspilled
+    // partitions, SpillPartition() will return an error status.
+    RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    if (stream->AddRow(row, &status)) return Status::OK();
+    RETURN_IF_ERROR(status);
+    // Spilling one partition does not guarantee we can append a row. Keep
+    // spilling until we can append this row.
+  }
+}
+
+// TODO: can we do better with a different spilling heuristic?
+Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
+  DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
+  int64_t max_freed_mem = 0;
+  int partition_idx = -1;
+
+  // Iterate over the partitions and pick the largest partition to spill.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* candidate = hash_partitions_[i];
+    if (candidate->IsClosed()) continue;
+    if (candidate->is_spilled()) continue;
+    int64_t mem = candidate->build_rows()->bytes_in_mem(false);
+    if (candidate->hash_tbl() != NULL) {
+      // The hash table should not have matches, since we have not probed it yet.
+      // Losing match info would lead to incorrect results (IMPALA-1488).
+      DCHECK(!candidate->hash_tbl()->HasMatches());
+      mem += candidate->hash_tbl()->ByteSize();
+    }
+    if (mem > max_freed_mem) {
+      max_freed_mem = mem;
+      partition_idx = i;
+    }
+  }
+
+  if (partition_idx == -1) {
+    // Could not find a partition to spill. This means the mem limit was just too low.
+    return runtime_state_->block_mgr()->MemLimitTooLowError(
+        block_mgr_client_, join_node_id_);
+  }
+
+  VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString();
+  Partition* build_partition = hash_partitions_[partition_idx];
+  RETURN_IF_ERROR(build_partition->Spill(mode));
+  return Status::OK();
+}
+
+// When this function is called, we've finished processing the current build input
+// (either from the child ExecNode or from repartitioning a spilled partition). The build
+// rows have only been partitioned, we still need to build hash tables over them. Some
+// of the partitions could have already been spilled and attempting to build hash
+// tables over the non-spilled ones can cause them to spill.
+//
+// At the end of the function all partitions either have a hash table (and therefore are
+// not spilled) or are spilled. Partitions that have hash tables don't need to spill on
+// the probe side.
+//
+// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
+// build rows and hash table and the value is the expected IO savings.
+// For now, we go with a greedy solution.
+//
+// TODO: implement the knapsack solution.
+Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
+  DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
+
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition->build_rows()->num_rows() == 0) {
+      // This partition is empty, no need to do anything else.
+      partition->Close(NULL);
+    } else if (partition->is_spilled()) {
+      // We don't need any build-side data for spilled partitions in memory.
+      RETURN_IF_ERROR(
+          partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+    }
+  }
+
+  // Allocate probe buffers for all partitions that are already spilled. Do this before
+  // building hash tables because allocating probe buffers may cause some more partitions
+  // to be spilled. This avoids wasted work on building hash tables for partitions that
+  // won't fit in memory alongside the required probe buffers.
+  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
+
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (partition->IsClosed() || partition->is_spilled()) continue;
+
+    bool built = false;
+    DCHECK(partition->build_rows()->is_pinned());
+    RETURN_IF_ERROR(partition->BuildHashTable(&built));
+    // If we did not have enough memory to build this hash table, we need to spill this
+    // partition (clean up the hash table, unpin build).
+    if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
+  }
+
+  // We may have spilled additional partitions while building hash tables, we need to
+  // initialize probe buffers for them.
+  // TODO: once we have reliable reservations (IMPALA-3200) this should no longer be
+  // necessary: we will know exactly how many partitions will fit in memory and we can
+  // avoid building then immediately destroying hash tables.
+  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
+
+  // TODO: at this point we could have freed enough memory to pin and build some
+  // spilled partitions. This can happen, for example if there is a lot of skew.
+  // Partition 1: 10GB (pinned initially).
+  // Partition 2,3,4: 1GB (spilled during partitioning the build).
+  // In the previous step, we could have unpinned 10GB (because there was not enough
+  // memory to build a hash table over it) which can now free enough memory to
+  // build hash tables over the remaining 3 partitions.
+  // We start by spilling the largest partition though so the build input would have
+  // to be pretty pathological.
+  // Investigate if this is worthwhile.
+  return Status::OK();
+}
+
+Status PhjBuilder::InitSpilledPartitionProbeStreams() {
+  DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
+
+  int num_spilled_partitions = 0;
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    Partition* partition = hash_partitions_[i];
+    if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions;
+  }
+  int probe_streams_to_create =
+      num_spilled_partitions - spilled_partition_probe_streams_.size();
+
+  while (probe_streams_to_create > 0) {
+    // Create stream in vector, so that it will be cleaned up after any failure.
+    spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>(
+        runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), block_mgr_client_,
+        false /* use_initial_small_buffers */, false /* read_write */));
+    BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
+    RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false));
+
+    // Loop until either the stream gets a buffer or all partitions are spilled (in which
+    // case SpillPartition() returns an error).
+    while (true) {
+      bool got_buffer;
+      RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
+      if (got_buffer) break;
+
+      RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
+      ++probe_streams_to_create;
+    }
+    --probe_streams_to_create;
+  }
+  return Status::OK();
+}
+
+vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
+  return std::move(spilled_partition_probe_streams_);
+}
+
+void PhjBuilder::CloseAndDeletePartitions() {
+  // Close all the partitions and clean up all references to them.
+  for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(NULL);
+  all_partitions_.clear();
+  hash_partitions_.clear();
+  null_aware_partition_ = NULL;
+  for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
+    stream->Close();
+  }
+  spilled_partition_probe_streams_.clear();
+}
+
+void PhjBuilder::AllocateRuntimeFilters() {
+  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
+      << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
+  DCHECK(ht_ctx_ != NULL);
+  for (int i = 0; i < filters_.size(); ++i) {
+    filters_[i].local_bloom_filter =
+        runtime_state_->filter_bank()->AllocateScratchBloomFilter(
+            filters_[i].filter->id());
+  }
+}
+
+void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
+  int32_t num_enabled_filters = 0;
+  // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish
+  // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
+  // serialisation time, and reduces the cost of applying the filter at the scan - most
+  // significantly for per-row filters. However, the number of build rows could be a very
+  // poor estimate of the NDV - particularly if the filter expression is a function of
+  // several columns.
+  // TODO: Better heuristic.
+  for (const FilterContext& ctx : filters_) {
+    // TODO: Consider checking actual number of bits set in filter to compute FP rate.
+    // TODO: Consider checking this every few batches or so.
+    bool fp_rate_too_high = runtime_state_->filter_bank()->FpRateTooHigh(
+        ctx.filter->filter_size(), num_build_rows);
+    runtime_state_->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
+        fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
+
+    num_enabled_filters += !fp_rate_too_high;
+  }
+
+  if (filters_.size() > 0) {
+    string info_string;
+    if (num_enabled_filters == filters_.size()) {
+      info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
+          filters_.size() == 1 ? "" : "s");
+    } else {
+      info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
+          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
+          filters_.size() - num_enabled_filters);
+    }
+    profile()->AddInfoString("Runtime filters", info_string);
+  }
+}
+
+Status PhjBuilder::RepartitionBuildInput(
+    Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) {
+  DCHECK_GE(level, 1);
+  SCOPED_TIMER(repartition_timer_);
+  COUNTER_ADD(num_repartitions_, 1);
+  RuntimeState* state = runtime_state_;
+
+  // Setup the read buffer and the new partitions.
+  BufferedTupleStream* build_rows = input_partition->build_rows();
+  DCHECK(build_rows != NULL);
+  bool got_read_buffer;
+  RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
+  if (!got_read_buffer) {
+    return mem_tracker()->MemLimitExceeded(
+        state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_));
+  }
+  RETURN_IF_ERROR(CreateHashPartitions(level));
+
+  // Repartition 'input_stream' into 'hash_partitions_'.
+  RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker());
+  bool eos = false;
+  while (!eos) {
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(state->CheckQueryState());
+
+    RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos));
+    RETURN_IF_ERROR(Send(state, &build_batch));
+    build_batch.Reset();
+  }
+
+  // Done reading the input, we can safely close it now to free memory.
+  input_partition->Close(NULL);
+
+  // We just freed up the buffer used for reading build rows. Ensure a buffer is
+  // allocated for reading probe rows before we build the hash tables in FlushFinal().
+  // TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the
+  // reservation and avoid this complication.
+  while (true) {
+    bool got_buffer;
+    input_probe_rows->PrepareForRead(true, &got_buffer);
+    if (got_buffer) break;
+    RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+  }
+
+  RETURN_IF_ERROR(FlushFinal(state));
+  return Status::OK();
+}
+
+int64_t PhjBuilder::LargestPartitionRows() const {
+  int64_t max_rows = 0;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    DCHECK(partition != NULL);
+    if (partition->IsClosed()) continue;
+    int64_t rows = partition->build_rows()->num_rows();
+    if (rows > max_rows) max_rows = rows;
+  }
+  return max_rows;
+}
+
+bool PhjBuilder::HashTableStoresNulls() const {
+  return join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN
+      || join_op_ == TJoinOp::FULL_OUTER_JOIN
+      || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(),
+             false, std::logical_or<bool>());
+}
+
+PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
+  : parent_(parent), is_spilled_(false), level_(level) {
+  // If we're repartitioning, we can assume the build input is fairly large and small
+  // buffers will most likely just waste memory.
+  bool use_initial_small_buffers = level == 0;
+  build_rows_ =
+      std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, state->block_mgr(),
+          parent_->block_mgr_client_, use_initial_small_buffers, false /* read_write */);
+}
+
+PhjBuilder::Partition::~Partition() {
+  DCHECK(IsClosed());
+}
+
+int64_t PhjBuilder::Partition::EstimatedInMemSize() const {
+  return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
+}
+
+void PhjBuilder::Partition::Close(RowBatch* batch) {
+  if (IsClosed()) return;
+
+  if (hash_tbl_ != NULL) {
+    COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
+    hash_tbl_->Close();
+  }
+
+  // Transfer ownership of build_rows_ to batch if batch is not NULL.
+  // Otherwise, close the stream here.
+  if (build_rows_ != NULL) {
+    if (batch == NULL) {
+      build_rows_->Close();
+    } else {
+      batch->AddTupleStream(build_rows_.release());
+    }
+    build_rows_.reset();
+  }
+}
+
+Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
+  DCHECK(!IsClosed());
+  // Close the hash table as soon as possible to release memory.
+  if (hash_tbl() != NULL) {
+    hash_tbl_->Close();
+    hash_tbl_.reset();
+  }
+
+  // Unpin the stream as soon as possible to increase the chances that the
+  // SwitchToIoBuffers() call below will succeed.
+  RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
+
+  if (build_rows_->using_small_buffers()) {
+    bool got_buffer;
+    RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
+    if (!got_buffer) {
+      // We'll try again to get the buffers when the stream fills up the small buffers.
+      VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
+                 << this << " of join=" << parent_->join_node_id_
+                 << " build small buffers=" << build_rows_->using_small_buffers();
+      VLOG_FILE << GetStackTrace();
+    }
+  }
+
+  if (!is_spilled_) {
+    COUNTER_ADD(parent_->num_spilled_partitions_, 1);
+    if (parent_->num_spilled_partitions_->value() == 1) {
+      parent_->profile()->AppendExecOption("Spilled");
+    }
+  }
+  is_spilled_ = true;
+  return Status::OK();
+}
+
+Status PhjBuilder::Partition::BuildHashTable(bool* built) {
+  SCOPED_TIMER(parent_->build_hash_table_timer_);
+  DCHECK(build_rows_ != NULL);
+  *built = false;
+
+  // Before building the hash table, we need to pin the rows in memory.
+  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
+  if (!*built) return Status::OK();
+
+  RuntimeState* state = parent_->runtime_state_;
+  HashTableCtx* ctx = parent_->ht_ctx_.get();
+  ctx->set_level(level()); // Set the hash function for building the hash table.
+  RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker());
+  vector<BufferedTupleStream::RowIdx> indices;
+  bool eos = false;
+
+  // Allocate the partition-local hash table. Initialize the number of buckets based on
+  // the number of build rows (the number of rows is known at this point). This assumes
+  // there are no duplicates which can be wrong. However, the upside in the common case
+  // (few/no duplicates) is large and the downside when there are is low (a bit more
+  // memory; the bucket memory is small compared to the memory needed for all the build
+  // side allocations).
+  // One corner case is if the stream contains tuples with zero footprint (no materialized
+  // slots). If the tuples occupy no space, this implies all rows will be duplicates, so
+  // create a small hash table, IMPALA-2256.
+  //
+  // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly
+  // reading all of the spilled rows from disk when we won't succeed anyway.
+  int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
+      HashTable::EstimateNumBuckets(build_rows()->num_rows()) :
+      state->batch_size() * 2;
+  hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
+      true /* store_duplicates */, parent_->row_desc_.tuple_descriptors().size(),
+      build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
+  if (!hash_tbl_->Init()) goto not_built;
+
+  bool got_read_buffer;
+  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
+  DCHECK(got_read_buffer) << "Stream was already pinned.";
+  do {
+    RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
+    DCHECK_EQ(batch.num_rows(), indices.size());
+    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
+        << build_rows()->RowConsumesMemory();
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+    if (parent_->insert_batch_fn_ != NULL) {
+      InsertBatchFn insert_batch_fn;
+      if (level() == 0) {
+        insert_batch_fn = parent_->insert_batch_fn_level0_;
+      } else {
+        insert_batch_fn = parent_->insert_batch_fn_;
+      }
+      DCHECK(insert_batch_fn != NULL);
+      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
+    } else {
+      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto not_built;
+    }
+    RETURN_IF_ERROR(state->GetQueryStatus());
+    // Free any local allocations made while inserting.
+    ExprContext::FreeLocalAllocations(parent_->expr_ctxs_to_free_);
+    batch.Reset();
+  } while (!eos);
+
+  // The hash table fits in memory and is built.
+  DCHECK(*built);
+  DCHECK(hash_tbl_ != NULL);
+  is_spilled_ = false;
+  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
+  return Status::OK();
+
+not_built:
+  *built = false;
+  if (hash_tbl_ != NULL) {
+    hash_tbl_->Close();
+    hash_tbl_.reset();
+  }
+  return Status::OK();
+}
+
+void PhjBuilder::Codegen(RuntimeState* state) {
+  bool build_codegen_enabled = false;
+  bool insert_codegen_enabled = false;
+  Status build_codegen_status, insert_codegen_status;
+  if (state->codegen_enabled()) {
+    Status codegen_status;
+    // Codegen for hashing rows with the builder's hash table context.
+    Function* hash_fn;
+    codegen_status = ht_ctx_->CodegenHashRow(runtime_state_, false, &hash_fn);
+    Function* murmur_hash_fn;
+    codegen_status.MergeStatus(
+        ht_ctx_->CodegenHashRow(runtime_state_, true, &murmur_hash_fn));
+
+    // Codegen for evaluating build rows
+    Function* eval_build_row_fn;
+    codegen_status.MergeStatus(
+        ht_ctx_->CodegenEvalRow(runtime_state_, true, &eval_build_row_fn));
+
+    if (codegen_status.ok()) {
+      build_codegen_status =
+          CodegenProcessBuildBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
+      insert_codegen_status =
+          CodegenInsertBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
+    } else {
+      build_codegen_status = codegen_status;
+      insert_codegen_status = codegen_status;
+    }
+    build_codegen_enabled = build_codegen_status.ok();
+    insert_codegen_enabled = insert_codegen_status.ok();
+  }
+  profile()->AddCodegenMsg(build_codegen_enabled, build_codegen_status, "Build Side");
+  profile()->AddCodegenMsg(
+      insert_codegen_enabled, insert_codegen_status, "Hash Table Construction");
+}
+
+string PhjBuilder::DebugString() const {
+  stringstream ss;
+  ss << "Hash partitions: " << hash_partitions_.size() << ":" << endl;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    ss << " Hash partition " << i << " ptr=" << partition;
+    if (partition->IsClosed()) {
+      ss << " Closed";
+      continue;
+    }
+    if (partition->is_spilled()) {
+      ss << " Spilled";
+    }
+    DCHECK(partition->build_rows() != NULL);
+    ss << endl
+       << "    Build Rows: " << partition->build_rows()->num_rows()
+       << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")" << endl;
+    if (partition->hash_tbl() != NULL) {
+      ss << "    Hash Table Rows: " << partition->hash_tbl()->size() << endl;
+    }
+  }
+  return ss.str();
+}
+
+Status PhjBuilder::CodegenProcessBuildBatch(
+    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
+  LlvmCodeGen* codegen;
+  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
+
+  Function* process_build_batch_fn =
+      codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
+  DCHECK(process_build_batch_fn != NULL);
+
+  // Replace call sites
+  int replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Replace some hash table parameters with constants.
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = true;
+  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+      num_build_tuples, process_build_batch_fn, &replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
+  DCHECK_EQ(replaced_constants.stores_duplicates, 0);
+  DCHECK_EQ(replaced_constants.stores_tuples, 0);
+  DCHECK_EQ(replaced_constants.quadratic_probing, 0);
+
+  Function* process_build_batch_fn_level0 =
+      codegen->CloneFunction(process_build_batch_fn);
+
+  // Always build runtime filters at level0 (if there are any).
+  // Note that the first argument of this function is the return value.
+  Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 4);
+  build_filters_l0_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
+
+  // process_build_batch_fn_level0 uses CRC hash if available,
+  replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  // process_build_batch_fn uses murmur
+  replaced =
+      codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Never build filters after repartitioning, as all rows have already been added to the
+  // filters during the level0 build. Note that the first argument of this function is the
+  // return value.
+  Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 4);
+  build_filters_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
+
+  // Finalize ProcessBuildBatch functions
+  process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
+  if (process_build_batch_fn == NULL) {
+    return Status(
+        "Codegen'd PhjBuilder::ProcessBuildBatch() function "
+        "failed verification, see log");
+  }
+  process_build_batch_fn_level0 =
+      codegen->FinalizeFunction(process_build_batch_fn_level0);
+  if (process_build_batch_fn == NULL) {
+    return Status(
+        "Codegen'd level-zero PhjBuilder::ProcessBuildBatch() "
+        "function failed verification, see log");
+  }
+
+  // Register native function pointers
+  codegen->AddFunctionToJit(
+      process_build_batch_fn, reinterpret_cast<void**>(&process_build_batch_fn_));
+  codegen->AddFunctionToJit(process_build_batch_fn_level0,
+      reinterpret_cast<void**>(&process_build_batch_fn_level0_));
+  return Status::OK();
+}
+
+Status PhjBuilder::CodegenInsertBatch(
+    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
+  LlvmCodeGen* codegen;
+  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
+
+  Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
+  Function* build_equals_fn;
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(runtime_state_, true, &build_equals_fn));
+
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
+  // Use codegen'd EvalBuildRow() function
+  int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
+  DCHECK_EQ(replaced, 1);
+
+  // Use codegen'd Equals() function
+  replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
+  DCHECK_EQ(replaced, 1);
+
+  // Replace hash-table parameters with constants.
+  HashTableCtx::HashTableReplacedConstants replaced_constants;
+  const bool stores_duplicates = true;
+  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+      num_build_tuples, insert_batch_fn, &replaced_constants));
+  DCHECK_GE(replaced_constants.stores_nulls, 1);
+  DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
+  DCHECK_GE(replaced_constants.stores_duplicates, 1);
+  DCHECK_GE(replaced_constants.stores_tuples, 1);
+  DCHECK_GE(replaced_constants.quadratic_probing, 1);
+
+  Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
+
+  // Use codegen'd hash functions
+  replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+  replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
+  DCHECK_EQ(replaced, 1);
+
+  insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
+  if (insert_batch_fn == NULL) {
+    return Status(
+        "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
+        "InsertBatch() function failed verification, see log");
+  }
+  insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
+  if (insert_batch_fn_level0 == NULL) {
+    return Status(
+        "PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
+        "InsertBatch() function failed verification, see log");
+  }
+
+  codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
+  codegen->AddFunctionToJit(
+      insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_));
+  return Status::OK();
+}


[3/6] incubator-impala git commit: IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
new file mode 100644
index 0000000..23822b2
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
+#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
+
+#include <boost/scoped_ptr.hpp>
+#include <memory>
+
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "exec/data-sink.h"
+#include "exec/filter-context.h"
+#include "exec/hash-table.h"
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/buffered-tuple-stream.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+namespace impala {
+
+class ExprContext;
+class RowDescriptor;
+class RuntimeState;
+
+/// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned
+/// into PARTITION_FANOUT partitions, with partitions spilled if the full build side
+/// does not fit in memory. Spilled partitions can be repartitioned with a different
+/// hash function per level of repartitioning.
+///
+/// The builder owns the hash tables and build row streams. The builder first does the
+/// level 0 partitioning of build rows. After FlushFinal() the builder has produced some
+/// in-memory partitions and some spilled partitions. The in-memory partitions have hash
+/// tables and the spilled partitions have a probe-side stream prepared with one write
+/// buffer, which is sufficient to spill the partition's probe rows to disk without
+/// allocating additional buffers.
+///
+/// After this initial partitioning, the join node probes the in-memory hash partitions.
+/// The join node then drives processing of any spilled partitions, calling
+/// Partition::BuildHashTable() to build hash tables for a spilled partition or calling
+/// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1
+/// partitions.
+///
+/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client
+/// and the corresponding reservations. Different stages of the spilling algorithm
+/// require different mixes of build and probe buffers and hash tables, so we can
+/// share the reservation to minimize the combined memory requirement. Initial probe-side
+/// buffers are allocated in the builder then handed off to the probe side to implement
+/// this reservation sharing.
+///
+/// TODO: after we have reliable reservations (IMPALA-3200), we can simplify the handoff
+///   to the probe side by using reservations instead of preparing the streams.
+///
+/// The full hash join algorithm is documented in PartitionedHashJoinNode.
+class PhjBuilder : public DataSink {
+ public:
+  class Partition;
+
+  PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& probe_row_desc,
+      const RowDescriptor& build_row_desc, RuntimeState* state);
+
+  Status Init(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+      const std::vector<TRuntimeFilterDesc>& filters);
+
+  /// Implementations of DataSink interface methods.
+  virtual std::string GetName() override;
+  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status Send(RuntimeState* state, RowBatch* batch) override;
+  virtual Status FlushFinal(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
+
+  /////////////////////////////////////////
+  // The following functions are used only by PartitionedHashJoinNode.
+  /////////////////////////////////////////
+
+  /// Reset the builder to the same state as it was in after calling Open().
+  void Reset();
+
+  /// Transfer ownership of the probe streams to the caller. One stream was allocated per
+  /// spilled partition in FlushFinal(). The probe streams are empty but prepared for
+  /// writing with a write buffer allocated.
+  std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
+
+  /// Clears the current list of hash partitions. Called after probing of the partitions
+  /// is done. The partitions are not closed or destroyed, since they may be spilled or
+  /// may contain unmatched build rows for certain join modes (e.g. right outer join).
+  void ClearHashPartitions() { hash_partitions_.clear(); }
+
+  /// Close the null aware partition (if there is one) and set it to NULL.
+  void CloseNullAwarePartition(RowBatch* out_batch) {
+    if (null_aware_partition_ != NULL) {
+      null_aware_partition_->Close(out_batch);
+      null_aware_partition_ = NULL;
+    }
+  }
+
+  /// Creates new hash partitions and repartitions 'input_partition'. The previous
+  /// hash partitions must have been cleared with ClearHashPartitions().
+  /// 'level' is the level new partitions should be created with. This functions prepares
+  /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
+  /// has enough buffers preallocated to execute successfully.
+  Status RepartitionBuildInput(
+      Partition* input_partition, int level, BufferedTupleStream* input_probe_rows);
+
+  /// Returns the largest build row count out of the current hash partitions.
+  int64_t LargestPartitionRows() const;
+
+  /// True if the hash table may contain rows with one or more NULL join keys. This
+  /// depends on the join type and the equijoin conjuncts.
+  bool HashTableStoresNulls() const;
+
+  /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
+  inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; }
+  inline bool non_empty_build() const { return non_empty_build_; }
+  inline const std::vector<bool>& is_not_distinct_from() const {
+    return is_not_distinct_from_;
+  }
+  inline int num_hash_partitions() const { return hash_partitions_.size(); }
+  inline Partition* hash_partition(int partition_idx) const {
+    DCHECK_GE(partition_idx, 0);
+    DCHECK_LT(partition_idx, hash_partitions_.size());
+    return hash_partitions_[partition_idx];
+  }
+  inline Partition* null_aware_partition() const { return null_aware_partition_; }
+
+  std::string DebugString() const;
+
+  /// Number of initial partitions to create. Must be a power of two.
+  static const int PARTITION_FANOUT = 16;
+
+  /// Needs to be log2(PARTITION_FANOUT).
+  static const int NUM_PARTITIONING_BITS = 4;
+
+  /// Maximum number of times we will repartition. The maximum build table we
+  /// can process is:
+  /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB
+  /// limit and 64 fanout, we can support 256TB build tables in the case where
+  /// there is no skew.
+  /// In the case where there is skew, repartitioning is unlikely to help (assuming a
+  /// reasonable hash function).
+  /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
+  /// TODO: we can revisit and try harder to explicitly detect skew.
+  static const int MAX_PARTITION_DEPTH = 16;
+
+  /// A partition containing a subset of build rows.
+  ///
+  /// A partition may contain two data structures: the build rows and optionally a hash
+  /// table built over the build rows.  Building the hash table requires all build rows to
+  /// be pinned in memory. The build rows are kept in memory if all partitions fit in
+  /// memory, but can be unpinned and spilled to disk to free up memory. Reading or
+  /// writing the unpinned rows requires a single read or write buffer. If the unpinned
+  /// rows are not being read or written, they can be completely unpinned, requiring no
+  /// buffers.
+  ///
+  /// The build input is first partitioned by hash function level 0 into the level 0
+  /// partitions. Then, if the join spills and the size of a level n partition is too
+  /// large to fit in memory, the partition's rows can be repartitioned with the level
+  /// n + 1 hash function into the level n + 1 partitions.
+  class Partition {
+   public:
+    Partition(RuntimeState* state, PhjBuilder* parent, int level);
+    ~Partition();
+
+    /// Close the partition and attach resources to 'batch' if non-NULL or free the
+    /// resources if 'batch' is NULL. Idempotent.
+    void Close(RowBatch* batch);
+
+    /// Returns the estimated byte size of the in-memory data structures for this
+    /// partition. This includes all build rows and the hash table.
+    int64_t EstimatedInMemSize() const;
+
+    /// Pins the build tuples for this partition and constructs the hash table from it.
+    /// Build rows cannot be added after calling this. If the build rows could not be
+    /// pinned or the hash table could not be built due to memory pressure, sets *built
+    /// to false and returns OK. Returns an error status if any other error is
+    /// encountered.
+    Status BuildHashTable(bool* built);
+
+    /// Spills this partition, the partition's stream is unpinned with 'mode' and
+    /// its hash table is destroyed if it was built.
+    Status Spill(BufferedTupleStream::UnpinMode mode);
+
+    bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
+    BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
+    HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
+    bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
+    int ALWAYS_INLINE level() const { return level_; }
+
+   private:
+    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
+    /// containing the index of each row's index into the hash table's tuple stream.
+    /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
+    /// table buckets which the rows hashes to will be prefetched. This parameter is
+    /// replaced with a constant during codegen time. This function may be replaced with
+    /// a codegen'd version. Returns true if all rows in 'batch' are successfully
+    /// inserted.
+    bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
+        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
+
+    const PhjBuilder* parent_;
+
+    /// True if this partition is spilled.
+    bool is_spilled_;
+
+    /// How many times rows in this partition have been repartitioned. Partitions created
+    /// from the node's children's input is level 0, 1 after the first repartitioning,
+    /// etc.
+    const int level_;
+
+    /// The hash table for this partition.
+    boost::scoped_ptr<HashTable> hash_tbl_;
+
+    /// Stream of build tuples in this partition. Initially owned by this object but
+    /// transferred to the parent exec node (via the row batch) when the partition
+    /// is closed. If NULL, ownership has been transferred and the partition is closed.
+    std::unique_ptr<BufferedTupleStream> build_rows_;
+  };
+
+ private:
+  /// Computes the minimum number of buffers required to execute the spilling partitioned
+  /// hash algorithm successfully for any input size (assuming enough disk space is
+  /// available for spilled rows). The buffers are used for buffering both build and
+  /// probe rows at different times, so the total requirement is the peak sum of build
+  /// and probe buffers required.
+  /// We need one output buffer per partition to partition the build or probe side. We
+  /// need one additional buffer for the input while repartitioning the build or probe.
+  /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
+  /// 'null_aware_probe_partition_' and 'null_probe_rows_'.
+  int MinRequiredBuffers() const {
+    int num_reserved_buffers = PARTITION_FANOUT + 1;
+    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
+    return num_reserved_buffers;
+  }
+
+  /// Create and initialize a set of hash partitions for partitioning level 'level'.
+  /// The previous hash partitions must have been cleared with ClearHashPartitions().
+  /// After calling this, batches are added to the new partitions by calling Send().
+  Status CreateHashPartitions(int level);
+
+  /// Create a new partition in 'all_partitions_' and prepare it for writing.
+  Status CreateAndPreparePartition(int level, Partition** partition);
+
+  /// Reads the rows in build_batch and partitions them into hash_partitions_. If
+  /// 'build_filters' is true, runtime filters are populated.
+  Status ProcessBuildBatch(RowBatch* build_batch, HashTableCtx* ctx, bool build_filters);
+
+  /// Append 'row' to 'stream'. In the common case, appending the row to the stream
+  /// immediately succeeds. Otherwise this function falls back to the slower path of
+  /// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error
+  /// if it was unable to append the row, even after spilling partitions.
+  Status AppendRow(BufferedTupleStream* stream, TupleRow* row);
+
+  /// Slow path for AppendRow() above. It is called when the stream has failed to append
+  /// the row. We need to find more memory by either switching to IO-buffers, in case the
+  /// stream still uses small buffers, or spilling a partition.
+  Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row);
+
+  /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
+  /// to the Spill() call for the selected partition. The current policy is to spill the
+  /// largest partition. Returns non-ok status if we couldn't spill a partition.
+  Status SpillPartition(BufferedTupleStream::UnpinMode mode);
+
+  /// Tries to build hash tables for all unspilled hash partitions. Called after
+  /// FlushFinal() when all build rows have been partitioned and added to the appropriate
+  /// streams. If the hash table could not be built for a partition, the partition is
+  /// spilled (with all build blocks unpinned) and the probe stream is prepared for
+  /// writing (i.e. has an initial probe buffer allocated).
+  ///
+  /// When this function returns successfully, each partition is in one of these states:
+  /// 1. closed. No probe partition is created and the build partition is closed.
+  /// 2. in-memory. The build rows are pinned and has a hash table built. No probe
+  ///     partition is created.
+  /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared.
+  Status BuildHashTablesAndPrepareProbeStreams();
+
+  /// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition
+  /// in 'hash_partitions_'. May spill additional partitions until it can create enough
+  /// probe streams with write buffers. Returns an error if an error is encountered or
+  /// if it runs out of partitions to spill.
+  Status InitSpilledPartitionProbeStreams();
+
+  /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
+  /// may reference them. Also cleans up 'spilled_partition_probe_streams_'.
+  void CloseAndDeletePartitions();
+
+  /// For each filter in filters_, allocate a bloom_filter from the fragment-local
+  /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
+  /// phase.
+  void AllocateRuntimeFilters();
+
+  /// Publish the runtime filters to the fragment-local RuntimeFilterBank.
+  /// 'num_build_rows' is used to determine whether the computed filters have an
+  /// unacceptably high false-positive rate.
+  void PublishRuntimeFilters(int64_t num_build_rows);
+
+  /// Does all codegen for the builder (if codegen is enabled).
+  /// Updates the the builder's runtime profile with info about whether the codegen was
+  /// enabled and whether any errors occured during codegen.
+  void Codegen(RuntimeState* state);
+
+  /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
+  /// Returns non-OK status if codegen was not possible.
+  Status CodegenProcessBuildBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
+      llvm::Function* eval_row_fn);
+
+  /// Codegen inserting batches into a partition's hash table. Identical signature to
+  /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
+  Status CodegenInsertBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
+      llvm::Function* eval_row_fn);
+
+  RuntimeState* const runtime_state_;
+
+  // The ID of the plan join node this is associated with.
+  // TODO: we may want to replace this with a sink ID once we progress further with
+  // multithreading.
+  const int join_node_id_;
+
+  /// The join operation this is building for.
+  const TJoinOp::type join_op_;
+
+  /// Descriptor for the probe rows, needed to initialize probe streams.
+  const RowDescriptor& probe_row_desc_;
+
+  /// Pool for objects with same lifetime as builder.
+  ObjectPool pool_;
+
+  /// Client to the buffered block mgr, used to allocate build partition buffers and hash
+  /// tables. When probing, the spilling algorithm keeps some build partitions in memory
+  /// while using memory for probe buffers for spilled partitions. To support dynamically
+  /// dividing memory between build and probe, this client is owned by the builder but
+  /// shared with the PartitionedHashJoinNode.
+  /// TODO: this approach to sharing will not work for spilling broadcast joins with a
+  /// 1:N relationship from builders to join nodes.
+  BufferedBlockMgr::Client* block_mgr_client_;
+
+  /// If true, the build side has at least one row.
+  bool non_empty_build_;
+
+  /// Expr contexts to free after partitioning or inserting each batch.
+  std::vector<ExprContext*> expr_ctxs_to_free_;
+
+  /// Expression contexts over input rows for hash table build.
+  std::vector<ExprContext*> build_expr_ctxs_;
+
+  /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
+  /// NOT DISTINCT FROM, rather than equality.
+  std::vector<bool> is_not_distinct_from_;
+
+  /// List of filters to build.
+  std::vector<FilterContext> filters_;
+
+  /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
+  /// The level is set to the same level as 'hash_partitions_'.
+  boost::scoped_ptr<HashTableCtx> ht_ctx_;
+
+  /// Total number of partitions created.
+  RuntimeProfile::Counter* partitions_created_;
+
+  /// The largest fraction (of build side) after repartitioning. This is expected to be
+  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
+  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
+
+  /// Level of max partition (i.e. number of repartitioning steps).
+  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
+
+  /// Number of build rows that have been partitioned.
+  RuntimeProfile::Counter* num_build_rows_partitioned_;
+
+  /// Number of hash collisions - unequal rows that have identical hash values
+  RuntimeProfile::Counter* num_hash_collisions_;
+
+  /// Total number of hash buckets across all partitions.
+  RuntimeProfile::Counter* num_hash_buckets_;
+
+  /// Number of partitions that have been spilled.
+  RuntimeProfile::Counter* num_spilled_partitions_;
+
+  /// Number of partitions that have been repartitioned.
+  RuntimeProfile::Counter* num_repartitions_;
+
+  /// Time spent partitioning build rows.
+  RuntimeProfile::Counter* partition_build_rows_timer_;
+
+  /// Time spent building hash tables.
+  RuntimeProfile::Counter* build_hash_table_timer_;
+
+  /// Time spent repartitioning and building hash tables of any resulting partitions
+  /// that were not spilled.
+  RuntimeProfile::Counter* repartition_timer_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Vector that owns all of the Partition objects.
+  std::vector<std::unique_ptr<Partition>> all_partitions_;
+
+  /// The current set of partitions that are being built or probed. This vector is
+  /// initialized before partitioning or re-partitioning the build input
+  /// and cleared after we've finished probing the partitions.
+  /// This is not used when processing a single spilled partition.
+  std::vector<Partition*> hash_partitions_;
+
+  /// Partition used for null-aware joins. This partition is always processed at the end
+  /// after all build and probe rows are processed. In this partition's 'build_rows_', we
+  /// store all the rows for which 'build_expr_ctxs_' evaluated over the row returns NULL
+  /// (i.e. it has a NULL on the eq join slot).
+  /// NULL if the join is not null aware or we are done processing this partition.
+  Partition* null_aware_partition_;
+
+  /// Populated during the hash table building phase if any partitions spilled.
+  /// One probe stream per spilled partition is prepared for writing so that the
+  /// initial write buffer is allocated.
+  ///
+  /// These streams are handed off to PartitionedHashJoinNode for use in buffering
+  /// spilled probe rows. The allocation is done in the builder so that it can divide
+  /// memory between the in-memory build partitions and write buffers based on the size
+  /// of the partitions and available memory. E.g. if all the partitions fit in memory, no
+  /// write buffers need to be allocated, but if some partitions are spilled, more build
+  /// partitions may be spilled to free up memory for write buffers.
+  ///
+  /// Because of this, at the end of the build phase, we always have sufficient memory
+  /// to execute the probe phase of the algorithm without spilling more partitions.
+  std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
+  /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is
+  /// used for subsequent levels.
+  typedef Status (*ProcessBuildBatchFn)(
+      PhjBuilder*, RowBatch*, HashTableCtx*, bool build_filters);
+  /// Jitted ProcessBuildBatch function pointers.  NULL if codegen is disabled.
+  ProcessBuildBatchFn process_build_batch_fn_;
+  ProcessBuildBatchFn process_build_batch_fn_level0_;
+
+  typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
+      const std::vector<BufferedTupleStream::RowIdx>&);
+  /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
+  InsertBatchFn insert_batch_fn_;
+  InsertBatchFn insert_batch_fn_level0_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index bab1bf8..44cb14b 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -19,10 +19,7 @@
 
 #include "codegen/impala-ir.h"
 #include "exec/hash-table.inline.h"
-#include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "util/bloom-filter.h"
 
 #include "common/names.h"
 
@@ -151,12 +148,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
       // should interpret the hash table probe as "unknown" if there are nulls on the
       // build side. For those rows, we need to process the remaining join
       // predicates later.
-      if (null_aware_partition_->build_rows()->num_rows() != 0) {
-        if (num_other_join_conjuncts > 0 &&
-            UNLIKELY(!AppendRow(null_aware_partition_->probe_rows(),
-                         current_probe_row_, status))) {
-          DCHECK(!status->ok());
-          return false;
+      if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) {
+        if (num_other_join_conjuncts > 0) {
+          *status = AppendProbeRow(
+              null_aware_probe_partition_->probe_rows(), current_probe_row_);
+          if (UNLIKELY(!status->ok())) return false;
         }
         return true;
       }
@@ -271,7 +267,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
 
     // Fetch the hash and expr values' nullness for this row.
     if (expr_vals_cache->IsRowNull()) {
-      if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
+      if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && builder_->non_empty_build()) {
         const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
         // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
         // 1. No build rows -> Return this row. The check for 'non_empty_build_'
@@ -284,14 +280,12 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
         if (num_other_join_conjuncts == 0) {
           // Condition 2 above.
           skip_row = true;
-        } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) {
+        } else {
           // Condition 3 above.
+          *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_);
+          if (UNLIKELY(!status->ok())) return false;
           matched_null_probe_.push_back(false);
           skip_row = true;
-        } else {
-          // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out.
-          DCHECK(!status->ok());
-          return false;
         }
       }
     } else {
@@ -300,20 +294,20 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
         hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx);
       } else {
         // The build partition is either empty or spilled.
-        Partition* partition = hash_partitions_[partition_idx];
-        // This partition is closed, meaning the build side for this partition was empty.
-        if (UNLIKELY(partition->is_closed())) {
-          DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+        PhjBuilder::Partition* build_partition = builder_->hash_partition(partition_idx);
+        ProbePartition* probe_partition = probe_hash_partitions_[partition_idx].get();
+        DCHECK((build_partition->IsClosed() && probe_partition == NULL)
+            || (build_partition->is_spilled() && probe_partition != NULL));
+
+        if (UNLIKELY(probe_partition == NULL)) {
+          // A closed partition implies that the build side for this partition was empty.
         } else {
-          // This partition is not in memory, spill the probe row and move to the next row.
-          DCHECK(partition->is_spilled());
-          DCHECK(partition->probe_rows() != NULL);
+          // The partition is not in memory, spill the probe row and move to the next row.
           // Skip the current row if we manage to append to the spilled partition's BTS.
           // Otherwise, we need to bail out and report the failure.
-          if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
-            DCHECK(!status->ok());
-            return false;
-          }
+          BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+          *status = AppendProbeRow(probe_rows, current_probe_row_);
+          if (UNLIKELY(!status->ok())) return false;
           skip_row = true;
         }
       }
@@ -358,9 +352,11 @@ void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup(
 }
 
 // CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen.
-template<int const JoinOp>
+template <int const JoinOp>
 int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
     RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+  DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION
+      || state_ == REPARTITIONING_PROBE);
   ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
   const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -430,82 +426,15 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
   return num_rows_added;
 }
 
-Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
-    bool build_filters) {
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache();
-  expr_vals_cache->Reset();
-  FOREACH_ROW(build_batch, 0, build_batch_iter) {
-    DCHECK(build_status_.ok());
-    TupleRow* build_row = build_batch_iter.Get();
-    if (!ht_ctx_->EvalAndHashBuild(build_row)) {
-      if (null_aware_partition_ != NULL) {
-        // TODO: remove with codegen/template
-        // If we are NULL aware and this build row has NULL in the eq join slot,
-        // append it to the null_aware partition. We will need it later.
-        if (UNLIKELY(!AppendRow(null_aware_partition_->build_rows(),
-                build_row, &build_status_))) {
-          return build_status_;
-        }
-      }
-      continue;
-    }
-    if (build_filters) {
-      DCHECK_EQ(ht_ctx_->level(), 0)
-          << "Runtime filters should not be built during repartitioning.";
-      for (const FilterContext& ctx: filters_) {
-        // TODO: codegen expr evaluation and hashing
-        if (ctx.local_bloom_filter == NULL) continue;
-        void* e = ctx.expr->GetValue(build_row);
-        uint32_t filter_hash = RawValue::GetHashValue(e, ctx.expr->root()->type(),
-            RuntimeFilterBank::DefaultHashSeed());
-        ctx.local_bloom_filter->Insert(filter_hash);
-      }
-    }
-    const uint32_t hash = expr_vals_cache->CurExprValuesHash();
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    Partition* partition = hash_partitions_[partition_idx];
-    const bool result = AppendRow(partition->build_rows(), build_row, &build_status_);
-    if (UNLIKELY(!result)) return build_status_;
-  }
-  return Status::OK();
-}
-
-bool PartitionedHashJoinNode::Partition::InsertBatch(
-    TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch,
-    const vector<BufferedTupleStream::RowIdx>& indices) {
-  // Compute the hash values and prefetch the hash table buckets.
-  const int num_rows = batch->num_rows();
-  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
-  const int prefetch_size = expr_vals_cache->capacity();
-  const BufferedTupleStream::RowIdx* row_indices = indices.data();
-  for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
-       prefetch_group_row += prefetch_size) {
-    int cur_row = prefetch_group_row;
-    expr_vals_cache->Reset();
-    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
-      if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
-        if (prefetch_mode != TPrefetchMode::NONE) {
-          hash_tbl_->PrefetchBucket<false>(expr_vals_cache->CurExprValuesHash());
-        }
-      } else {
-        expr_vals_cache->SetRowNull();
-      }
-      expr_vals_cache->NextRow();
-    }
-    // Do the insertion.
-    expr_vals_cache->ResetForRead();
-    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
-      TupleRow* row = batch_iter.Get();
-      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
-      if (!expr_vals_cache->IsRowNull() &&
-          UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
-        return false;
-      }
-      expr_vals_cache->NextRow();
-      ++cur_row;
-    }
-  }
-  return true;
+inline Status PartitionedHashJoinNode::AppendProbeRow(
+    BufferedTupleStream* stream, TupleRow* row) {
+  DCHECK(stream->has_write_block());
+  DCHECK(!stream->using_small_buffers());
+  DCHECK(!stream->is_pinned());
+  Status status;
+  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  DCHECK(!status.ok());
+  return status;
 }
 
 template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(


[5/6] incubator-impala git commit: IMPALA-3308: Get expr-test passing on PPC64LE

Posted by ta...@apache.org.
IMPALA-3308: Get expr-test passing on PPC64LE

When using gcc 5+ (which introduced a new library ABI that includes new
implementations of std::string) to build Impala, the copy of the class
ExprValue(std::string) would be unsafe as string_val.ptr will not be
updated to point to the relocated string_data.data().

In order to solve this issue, we need to change how we initialize value_
so that it is initialized in-place, rather than created as a temporary
on the stack and then copied.

Change-Id: I4504ee6a52a085f530aadfcfa009bacb83c64787
Reviewed-on: http://gerrit.cloudera.org:8080/4186
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9cee2b5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9cee2b5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9cee2b5f

Branch: refs/heads/master
Commit: 9cee2b5f1376ad6286ed65edffe6d152a0012cf1
Parents: 1ccd83b
Author: segelyang <zh...@cn.ibm.com>
Authored: Wed Aug 31 18:13:17 2016 +0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:09:28 2016 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-value.h |  9 +++++----
 be/src/exprs/literal.cc   | 14 ++++++--------
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9cee2b5f/be/src/exprs/expr-value.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-value.h b/be/src/exprs/expr-value.h
index cbe1361..a2336d3 100644
--- a/be/src/exprs/expr-value.h
+++ b/be/src/exprs/expr-value.h
@@ -67,10 +67,9 @@ struct ExprValue {
   ExprValue(double v) : double_val(v) {}
   ExprValue(int64_t t, int64_t n) : timestamp_val(t, n) {}
 
-  /// c'tor for string values
-  ExprValue(const std::string& str)
-    : string_data(str) {
-    string_val.ptr = const_cast<char*>(string_data.data());
+  void Init(const std::string& str) {
+    string_data = str;
+    string_val.ptr = &string_data[0];
     string_val.len = string_data.size();
   }
 
@@ -198,6 +197,8 @@ struct ExprValue {
 
  private:
   std::string string_data; // Stores the data for string_val if necessary.
+
+  DISALLOW_COPY_AND_ASSIGN(ExprValue);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9cee2b5f/be/src/exprs/literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.cc b/be/src/exprs/literal.cc
index cc089da..1d816c3 100644
--- a/be/src/exprs/literal.cc
+++ b/be/src/exprs/literal.cc
@@ -74,7 +74,7 @@ Literal::Literal(const TExprNode& node)
     case TYPE_VARCHAR: {
       DCHECK_EQ(node.node_type, TExprNodeType::STRING_LITERAL);
       DCHECK(node.__isset.string_literal);
-      value_ = ExprValue(node.string_literal.value);
+      value_.Init(node.string_literal.value);
       if (type_.type == TYPE_VARCHAR) {
         value_.string_val.len = min(type_.len, value_.string_val.len);
       }
@@ -91,7 +91,7 @@ Literal::Literal(const TExprNode& node)
         // Pad out literal with spaces.
         str.replace(str_len, type_.len - str_len, type_.len - str_len, ' ');
       }
-      value_ = ExprValue(str);
+      value_.Init(str);
       break;
     }
     case TYPE_DECIMAL: {
@@ -186,16 +186,14 @@ Literal::Literal(ColumnType type, double v)
   }
 }
 
-Literal::Literal(ColumnType type, const string& v)
-  : Expr(type),
-    value_(v) {
+Literal::Literal(ColumnType type, const string& v) : Expr(type) {
+  value_.Init(v);
   DCHECK(type.type == TYPE_STRING || type.type == TYPE_CHAR || type.type == TYPE_VARCHAR)
       << type;
 }
 
-Literal::Literal(ColumnType type, const StringValue& v)
-  : Expr(type),
-    value_(v.DebugString()) {
+Literal::Literal(ColumnType type, const StringValue& v) : Expr(type) {
+  value_.Init(v.DebugString());
   DCHECK(type.type == TYPE_STRING || type.type == TYPE_CHAR) << type;
 }