You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:20 UTC

[09/11] incubator-impala git commit: IMPALA-4674: Part 2: port backend exec to BufferPool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index e0393b5..912613d 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -26,8 +26,9 @@
 #include "exec/data-sink.h"
 #include "exec/filter-context.h"
 #include "exec/hash-table.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/suballocator.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 
@@ -56,7 +57,7 @@ class ScalarExprEvaluator;
 /// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1
 /// partitions.
 ///
-/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client
+/// Both the PartitionedHashJoinNode and the builder share a BufferPool client
 /// and the corresponding reservations. Different stages of the spilling algorithm
 /// require different mixes of build and probe buffers and hash tables, so we can
 /// share the reservation to minimize the combined memory requirement. Initial probe-side
@@ -72,7 +73,8 @@ class PhjBuilder : public DataSink {
   class Partition;
 
   PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
-      const RowDescriptor* build_row_desc, RuntimeState* state);
+      const RowDescriptor* build_row_desc, RuntimeState* state,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size);
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -101,7 +103,7 @@ class PhjBuilder : public DataSink {
   /// Transfer ownership of the probe streams to the caller. One stream was allocated per
   /// spilled partition in FlushFinal(). The probe streams are empty but prepared for
   /// writing with a write buffer allocated.
-  std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
+  std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams();
 
   /// Clears the current list of hash partitions. Called after probing of the partitions
   /// is done. The partitions are not closed or destroyed, since they may be spilled or
@@ -122,7 +124,7 @@ class PhjBuilder : public DataSink {
   /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
   /// has enough buffers preallocated to execute successfully.
   Status RepartitionBuildInput(Partition* input_partition, int level,
-      BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
+      BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT;
 
   /// Returns the largest build row count out of the current hash partitions.
   int64_t LargestPartitionRows() const;
@@ -132,7 +134,6 @@ class PhjBuilder : public DataSink {
   bool HashTableStoresNulls() const;
 
   /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
-  inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; }
   inline bool non_empty_build() const { return non_empty_build_; }
   inline const std::vector<bool>& is_not_distinct_from() const {
     return is_not_distinct_from_;
@@ -200,24 +201,27 @@ class PhjBuilder : public DataSink {
 
     /// Spills this partition, the partition's stream is unpinned with 'mode' and
     /// its hash table is destroyed if it was built.
-    Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
+    Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
 
     bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
-    BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
+    BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
     HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
     bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
     int ALWAYS_INLINE level() const { return level_; }
 
    private:
-    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
-    /// containing the index of each row's index into the hash table's tuple stream.
+    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array
+    /// containing the rows in the hash table's tuple stream.
     /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
     /// table buckets which the rows hashes to will be prefetched. This parameter is
     /// replaced with a constant during codegen time. This function may be replaced with
     /// a codegen'd version. Returns true if all rows in 'batch' are successfully
-    /// inserted.
+    /// inserted and false otherwise. If inserting failed, 'status' indicates why it
+    /// failed: if 'status' is ok, inserting failed because not enough reservation
+    /// was available and if 'status' is an error, inserting failed because of that error.
     bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
-        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
+        RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows,
+        Status* status);
 
     const PhjBuilder* parent_;
 
@@ -235,16 +239,9 @@ class PhjBuilder : public DataSink {
     /// Stream of build tuples in this partition. Initially owned by this object but
     /// transferred to the parent exec node (via the row batch) when the partition
     /// is closed. If NULL, ownership has been transferred and the partition is closed.
-    std::unique_ptr<BufferedTupleStream> build_rows_;
+    std::unique_ptr<BufferedTupleStreamV2> build_rows_;
   };
 
- protected:
-  /// Init() function inherited from DataSink. Overridden to be a no-op for now.
-  /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink.
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state) override;
-
- private:
   /// Computes the minimum number of buffers required to execute the spilling partitioned
   /// hash algorithm successfully for any input size (assuming enough disk space is
   /// available for spilled rows). The buffers are used for buffering both build and
@@ -255,15 +252,22 @@ class PhjBuilder : public DataSink {
   /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
   /// 'null_aware_probe_partition_' and 'null_probe_rows_'.
   int MinRequiredBuffers() const {
-    // Must be kept in sync with HashJoinNode.computeResourceProfile() in fe.
+    // Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe.
     int num_reserved_buffers = PARTITION_FANOUT + 1;
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
     return num_reserved_buffers;
   }
 
+ protected:
+  /// Init() function inherited from DataSink. Overridden to be a no-op for now.
+  /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink.
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state) override;
+
   /// Free local allocations made from expr evaluators during hash table construction.
   void FreeLocalAllocations() const;
 
+ private:
   /// Create and initialize a set of hash partitions for partitioning level 'level'.
   /// The previous hash partitions must have been cleared with ClearHashPartitions().
   /// After calling this, batches are added to the new partitions by calling Send().
@@ -284,19 +288,19 @@ class PhjBuilder : public DataSink {
   /// partitions. This odd return convention is used to avoid emitting unnecessary code
   /// for ~Status in perf-critical code.
   bool AppendRow(
-      BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+      BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
 
   /// Slow path for AppendRow() above. It is called when the stream has failed to append
   /// the row. We need to find more memory by either switching to IO-buffers, in case the
   /// stream still uses small buffers, or spilling a partition. Returns false and sets
   /// 'status' if it was unable to append the row, even after spilling partitions.
-  bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
+  bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row,
       Status* status) noexcept WARN_UNUSED_RESULT;
 
   /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
   /// to the Spill() call for the selected partition. The current policy is to spill the
   /// largest partition. Returns non-ok status if we couldn't spill a partition.
-  Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
+  Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
 
   /// Tries to build hash tables for all unspilled hash partitions. Called after
   /// FlushFinal() when all build rows have been partitioned and added to the appropriate
@@ -358,14 +362,20 @@ class PhjBuilder : public DataSink {
   /// Pool for objects with same lifetime as builder.
   ObjectPool pool_;
 
-  /// Client to the buffered block mgr, used to allocate build partition buffers and hash
-  /// tables. When probing, the spilling algorithm keeps some build partitions in memory
-  /// while using memory for probe buffers for spilled partitions. To support dynamically
-  /// dividing memory between build and probe, this client is owned by the builder but
-  /// shared with the PartitionedHashJoinNode.
+  /// Client to the buffer pool, used to allocate build partition buffers and hash tables.
+  /// When probing, the spilling algorithm keeps some build partitions in memory while
+  /// using memory for probe buffers for spilled partitions. To support dynamically
+  /// dividing memory between build and probe, this client is shared between the builder
+  /// and the PartitionedHashJoinNode.
   /// TODO: this approach to sharing will not work for spilling broadcast joins with a
   /// 1:N relationship from builders to join nodes.
-  BufferedBlockMgr::Client* block_mgr_client_;
+  BufferPool::ClientHandle* buffer_pool_client_;
+
+  /// The size of buffers to use in the build and probe streams.
+  const int64_t spillable_buffer_size_;
+
+  /// Allocator for hash table memory.
+  boost::scoped_ptr<Suballocator> ht_allocator_;
 
   /// If true, the build side has at least one row.
   bool non_empty_build_;
@@ -454,7 +464,7 @@ class PhjBuilder : public DataSink {
   ///
   /// Because of this, at the end of the build phase, we always have sufficient memory
   /// to execute the probe phase of the algorithm without spilling more partitions.
-  std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
+  std::vector<std::unique_ptr<BufferedTupleStreamV2>> spilled_partition_probe_streams_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
@@ -469,7 +479,7 @@ class PhjBuilder : public DataSink {
   ProcessBuildBatchFn process_build_batch_fn_level0_;
 
   typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
-      const std::vector<BufferedTupleStream::RowIdx>&);
+      const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*);
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 2c951d1..b890eb9 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
           // The partition is not in memory, spill the probe row and move to the next row.
           // Skip the current row if we manage to append to the spilled partition's BTS.
           // Otherwise, we need to bail out and report the failure.
-          BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+          BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
           if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) {
             DCHECK(!status->ok());
             return false;
@@ -438,9 +438,8 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
 }
 
 inline bool PartitionedHashJoinNode::AppendProbeRow(
-    BufferedTupleStream* stream, TupleRow* row, Status* status) {
-  DCHECK(stream->has_write_block());
-  DCHECK(!stream->using_small_buffers());
+    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+  DCHECK(stream->has_write_iterator());
   DCHECK(!stream->is_pinned());
   return stream->AddRow(row, status);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a5c9897..2db9e00 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -27,8 +27,7 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -47,9 +46,15 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
     "successfully.";
 
 using namespace impala;
-using namespace llvm;
-using namespace strings;
-using std::unique_ptr;
+using llvm::BasicBlock;
+using llvm::ConstantInt;
+using llvm::Function;
+using llvm::GlobalValue;
+using llvm::LLVMContext;
+using llvm::PointerType;
+using llvm::Type;
+using llvm::Value;
+using strings::Substitute;
 
 PartitionedHashJoinNode::PartitionedHashJoinNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -77,8 +82,9 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
   // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
   // being separated out further.
-  builder_.reset(
-      new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
+  builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(),
+        child(1)->row_desc(), state, &buffer_pool_client_,
+        resource_profile_.spillable_buffer_size));
   RETURN_IF_ERROR(
       builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
 
@@ -177,6 +183,11 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
 }
 
 Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
+  DCHECK_GE(resource_profile_.min_reservation,
+      resource_profile_.spillable_buffer_size * builder_->MinRequiredBuffers());
+  if (!buffer_pool_client_.is_registered()) {
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
+  }
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     // Initialize these partitions before doing the build so that the build does not
     // use the reservation intended for them.
@@ -254,12 +265,10 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
 
 PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
     PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
-    unique_ptr<BufferedTupleStream> probe_rows)
-  : parent_(parent),
-    build_partition_(build_partition),
+    unique_ptr<BufferedTupleStreamV2> probe_rows)
+  : build_partition_(build_partition),
     probe_rows_(std::move(probe_rows)) {
-  DCHECK(probe_rows_->has_write_block());
-  DCHECK(!probe_rows_->using_small_buffers());
+  DCHECK(probe_rows_->has_write_iterator());
   DCHECK(!probe_rows_->is_pinned());
 }
 
@@ -270,10 +279,7 @@ PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
 Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
   bool got_read_buffer;
   RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
-  if (!got_read_buffer) {
-    return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_,
-        Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_));
-  }
+  DCHECK(got_read_buffer) << "Accounted in min reservation";
   return Status::OK();
 }
 
@@ -322,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     probe_batch_pos_ = -1;
     return Status::OK();
   }
-  BufferedTupleStream* probe_rows = input_partition_->probe_rows();
+  BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows();
   if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
     // Continue from the current probe stream.
     bool eos = false;
@@ -414,12 +420,11 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     ht_ctx_->set_level(next_partition_level);
 
     // Spill to free memory from hash tables and pinned streams for use in new partitions.
-    RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
+    RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
     // Temporarily free up the probe buffer to use when repartitioning.
-    RETURN_IF_ERROR(
-        input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
-    DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
-    DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
+    input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
+    DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
     RETURN_IF_ERROR(builder_->RepartitionBuildInput(
         build_partition, next_partition_level, input_partition_->probe_rows()));
@@ -430,7 +435,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
                                                          "more rows than the input";
     if (UNLIKELY(num_input_rows == largest_partition_rows)) {
       return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
-          next_partition_level, num_input_rows);
+          next_partition_level, num_input_rows, NodeDebugString(),
+          buffer_pool_client_.DebugString());
     }
 
     RETURN_IF_ERROR(PrepareForProbe());
@@ -816,18 +822,18 @@ static Status NullAwareAntiJoinError(bool build) {
 
 Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   RuntimeState* state = runtime_state_;
-  unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>(
-      state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(),
-      false /* use_initial_small_buffers */, false /* read_write */);
-  Status status = probe_rows->Init(id(), runtime_profile(), false);
+  unique_ptr<BufferedTupleStreamV2> probe_rows = make_unique<BufferedTupleStreamV2>(
+      state, child(0)->row_desc(), &buffer_pool_client_,
+      resource_profile_.spillable_buffer_size,
+      resource_profile_.spillable_buffer_size);
+  // TODO: this should be pinned if spilling is disabled.
+  Status status = probe_rows->Init(id(), false);
   if (!status.ok()) goto error;
   bool got_buffer;
   status = probe_rows->PrepareForWrite(&got_buffer);
   if (!status.ok()) goto error;
-  if (!got_buffer) {
-    status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
-    goto error;
-  }
+  DCHECK(got_buffer)
+      << "Accounted in min reservation" << buffer_pool_client_.DebugString();
   null_aware_probe_partition_.reset(new ProbePartition(
       state, this, builder_->null_aware_partition(), std::move(probe_rows)));
   return Status::OK();
@@ -841,15 +847,15 @@ error:
 
 Status PartitionedHashJoinNode::InitNullProbeRows() {
   RuntimeState* state = runtime_state_;
-  null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
-      state->block_mgr(), builder_->block_mgr_client(),
-      false /* use_initial_small_buffers */, false /* read_write */);
-  RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+  null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, child(0)->row_desc(),
+      &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+      resource_profile_.spillable_buffer_size);
+  // TODO: we shouldn't start with this unpinned if spilling is disabled.
+  RETURN_IF_ERROR(null_probe_rows_->Init(id(), false));
   bool got_buffer;
   RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
-  if (!got_buffer) {
-    return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
-  }
+  DCHECK(got_buffer)
+      << "Accounted in min reservation" << buffer_pool_client_.DebugString();
   return Status::OK();
 }
 
@@ -860,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
-  BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
-  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
+  BufferedTupleStreamV2* build_stream = builder_->null_aware_partition()->build_rows();
+  BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
 
   if (build_stream->num_rows() == 0) {
     // There were no build rows. Nothing to do. Just prepare to output the null
@@ -874,7 +880,7 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
 
   // Bring the entire spilled build stream into memory and read into a single batch.
   bool got_rows;
-  RETURN_IF_ERROR(build_stream->GetRows(&nulls_build_batch_, &got_rows));
+  RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, &got_rows));
   if (!got_rows) return NullAwareAntiJoinError(true);
 
   // Initialize the streams for read.
@@ -898,7 +904,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   int num_join_conjuncts = other_join_conjuncts_.size();
   DCHECK(probe_batch_ != NULL);
 
-  BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
+  BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
     probe_batch_pos_ = 0;
     probe_batch_->TransferResourceOwnership(out_batch);
@@ -946,7 +952,8 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
   DCHECK(probe_hash_partitions_.empty());
 
   // Initialize the probe partitions, providing them with probe streams.
-  vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams();
+  vector<unique_ptr<BufferedTupleStreamV2>> probe_streams =
+      builder_->TransferProbeStreams();
   probe_hash_partitions_.resize(PARTITION_FANOUT);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
@@ -982,16 +989,16 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
 }
 
 void PartitionedHashJoinNode::CreateProbePartition(
-    int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
+    int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) {
   DCHECK_GE(partition_idx, 0);
   DCHECK_LT(partition_idx, probe_hash_partitions_.size());
   DCHECK(probe_hash_partitions_[partition_idx] == NULL);
-  probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_,
+  probe_hash_partitions_[partition_idx] = make_unique<ProbePartition>(runtime_state_,
       this, builder_->hash_partition(partition_idx), std::move(probe_rows));
 }
 
 Status PartitionedHashJoinNode::EvaluateNullProbe(
-    RuntimeState* state, BufferedTupleStream* build) {
+    RuntimeState* state, BufferedTupleStreamV2* build) {
   if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
   }
@@ -1000,10 +1007,10 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
   // Bring both the build and probe side into memory and do a pairwise evaluation.
   bool got_rows;
   scoped_ptr<RowBatch> build_rows;
-  RETURN_IF_ERROR(build->GetRows(&build_rows, &got_rows));
+  RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows));
   if (!got_rows) return NullAwareAntiJoinError(true);
   scoped_ptr<RowBatch> probe_rows;
-  RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows));
+  RETURN_IF_ERROR(null_probe_rows_->GetRows(mem_tracker(), &probe_rows, &got_rows));
   if (!got_rows) return NullAwareAntiJoinError(false);
 
   ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
@@ -1060,11 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
       // can recurse the algorithm and create new hash partitions from spilled partitions.
       // TODO: we shouldn't need to unpin the build stream if we stop spilling
       // while probing.
-      RETURN_IF_ERROR(
-          build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
-      DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0);
-      RETURN_IF_ERROR(
-          probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+      DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
+      probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
 
       if (probe_partition->probe_rows()->num_rows() != 0
           || NeedToProcessUnmatchedBuildRows()) {
@@ -1102,9 +1107,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
 
   // Just finished evaluating the null probe rows with all the non-spilled build
   // partitions. Unpin this now to free this memory for repartitioning.
-  if (null_probe_rows_ != NULL)
-    RETURN_IF_ERROR(
-        null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+  if (null_probe_rows_ != NULL) {
+    null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+  }
 
   builder_->ClearHashPartitions();
   probe_hash_partitions_.clear();
@@ -1165,10 +1170,10 @@ string PartitionedHashJoinNode::NodeDebugString() const {
     ss << "  Probe hash partition " << i << ": ";
     if (probe_partition != NULL) {
       ss << "probe ptr=" << probe_partition;
-      BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+      BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
       if (probe_rows != NULL) {
-         ss << "    Probe Rows: " << probe_rows->num_rows()
-            << "    (Blocks pinned: " << probe_rows->blocks_pinned() << ")";
+        ss << "    Probe Rows: " << probe_rows->num_rows()
+           << "    (Bytes pinned: " << probe_rows->BytesPinned(false) << ")";
       }
     }
     ss << endl;
@@ -1189,12 +1194,15 @@ string PartitionedHashJoinNode::NodeDebugString() const {
     }
   }
   if (input_partition_ != NULL) {
-    DCHECK(input_partition_->build_partition()->build_rows() != NULL);
     DCHECK(input_partition_->probe_rows() != NULL);
-    ss << "InputPartition: " << input_partition_.get() << endl
-       << "   Spilled Build Rows: "
-       << input_partition_->build_partition()->build_rows()->num_rows() << endl
-       << "   Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
+    ss << "InputPartition: " << input_partition_.get() << endl;
+    PhjBuilder::Partition* build_partition = input_partition_->build_partition();
+    if (build_partition->IsClosed()) {
+      ss << "   Build Partition Closed" << endl;
+    } else {
+      ss << "   Build Rows: " << build_partition->build_rows()->num_rows() << endl;
+    }
+    ss << "   Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
   } else {
     ss << "InputPartition: NULL" << endl;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 73e0dd5..b3f663e 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -15,28 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
 
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread.hpp>
 #include <list>
 #include <memory>
 #include <string>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread.hpp>
 
 #include "exec/blocking-join-node.h"
 #include "exec/exec-node.h"
 #include "exec/partitioned-hash-join-builder.h"
-#include "runtime/buffered-block-mgr.h"
 
 #include "gen-cpp/Types_types.h"
 
 namespace impala {
 
 class BloomFilter;
-class BufferedBlockMgr;
-class BufferedTupleStream;
 class MemPool;
 class RowBatch;
 class RuntimeFilter;
@@ -100,8 +96,6 @@ class TupleRow;
 /// NULLs into several different streams, which are processed in a separate step to
 /// produce additional output rows. The NAAJ algorithm is documented in more detail in
 /// header comments for the null aware functions and data structures.
-///
-/// TODO: don't copy tuple rows so often.
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
   PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -168,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Creates an initialized probe partition at 'partition_idx' in
   /// 'probe_hash_partitions_'.
   void CreateProbePartition(
-      int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
+      int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows);
 
   /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
   /// a write buffer allocated, so this will succeed unless an error is encountered.
@@ -176,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// return convention is used to avoid emitting unnecessary code for ~Status in perf-
   /// critical code.
   bool AppendProbeRow(
-      BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+      BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns true
@@ -331,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// conjuncts pass (i.e. there is a match).
   /// This is used for NAAJ, when there are NULL probe rows.
   Status EvaluateNullProbe(
-      RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
+      RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT;
 
   /// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
   /// matched_null_probe_ should have been fully evaluated.
@@ -478,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// For NAAJ, this stream contains all probe rows that had NULL on the hash table
   /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches.
-  std::unique_ptr<BufferedTupleStream> null_probe_rows_;
+  std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_;
 
   /// For each row in null_probe_rows_, true if this row has matched any build row
   /// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -510,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// that has been prepared for writing with an I/O-sized write buffer.
     ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
         PhjBuilder::Partition* build_partition,
-        std::unique_ptr<BufferedTupleStream> probe_rows);
+        std::unique_ptr<BufferedTupleStreamV2> probe_rows);
     ~ProbePartition();
 
     /// Prepare to read the probe rows. Allocates the first read block, so reads will
@@ -523,21 +517,19 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// resources if 'batch' is NULL. Idempotent.
     void Close(RowBatch* batch);
 
-    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
+    BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
     PhjBuilder::Partition* build_partition() { return build_partition_; }
 
     inline bool IsClosed() const { return probe_rows_ == NULL; }
 
    private:
-    PartitionedHashJoinNode* parent_;
-
     /// The corresponding build partition. Not NULL. Owned by PhjBuilder.
     PhjBuilder::Partition* build_partition_;
 
     /// Stream of probe tuples in this partition. Initially owned by this object but
     /// transferred to the parent exec node (via the row batch) when the partition
     /// is complete. If NULL, ownership was transferred and the partition is closed.
-    std::unique_ptr<BufferedTupleStream> probe_rows_;
+    std::unique_ptr<BufferedTupleStreamV2> probe_rows_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index a53b40e..3441aac 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -20,7 +20,7 @@
 
 #include "exec/partitioned-hash-join-node.h"
 
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index fd42124..440f809 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -52,9 +52,12 @@ Status SortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
-  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_,
-      &row_descriptor_, mem_tracker(), runtime_profile(), state));
+  sorter_.reset(
+      new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, mem_tracker(),
+          &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+          runtime_profile(), state, id(), true));
   RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -69,9 +72,13 @@ void SortNode::Codegen(RuntimeState* state) {
 
 Status SortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  // Open the child before consuming resources in this node.
-  RETURN_IF_ERROR(child(0)->Open(state));
   RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(child(0)->Open(state));
+  // Claim reservation after the child has been opened to reduce the peak reservation
+  // requirement.
+  if (!buffer_pool_client_.is_registered()) {
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
+  }
   RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
   RETURN_IF_ERROR(sorter_->Open());
   RETURN_IF_CANCELLED(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 8b3de11..a11d424 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -20,13 +20,12 @@
 
 #include "exec/exec-node.h"
 #include "runtime/sorter.h"
-#include "runtime/buffered-block-mgr.h"
 
 namespace impala {
 
 /// Node that implements a full sort of its input with a fixed memory budget, spilling
 /// to disk if the input is larger than available memory.
-/// Uses Sorter and BufferedBlockMgr for the external sort implementation.
+/// Uses Sorter for the external sort implementation.
 /// Input rows to SortNode are materialized by the Sorter into a single tuple
 /// using the expressions specified in sort_tuple_exprs_.
 /// In GetNext(), SortNode passes in the output batch to the sorter instance created

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2de0f2e..92af968 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -24,8 +24,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 
 add_library(Runtime
-  buffered-block-mgr.cc
-  buffered-tuple-stream.cc
   buffered-tuple-stream-v2.cc
   client-cache.cc
   coordinator.cc
@@ -45,6 +43,7 @@ add_library(Runtime
   hbase-table.cc
   hbase-table-factory.cc
   hdfs-fs-cache.cc
+  initial-reservations.cc
   lib-cache.cc
   mem-tracker.cc
   mem-pool.cc
@@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test)
 ADD_BE_TEST(data-stream-test)
 ADD_BE_TEST(timestamp-test)
 ADD_BE_TEST(disk-io-mgr-test)
-ADD_BE_TEST(buffered-block-mgr-test)
 ADD_BE_TEST(parallel-executor-test)
 ADD_BE_TEST(raw-value-test)
 ADD_BE_TEST(string-compare-test)
@@ -93,7 +91,6 @@ ADD_BE_TEST(thread-resource-mgr-test)
 ADD_BE_TEST(mem-tracker-test)
 ADD_BE_TEST(multi-precision-test)
 ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-test)
 ADD_BE_TEST(buffered-tuple-stream-v2-test)
 ADD_BE_TEST(hdfs-fs-cache-test)
 ADD_BE_TEST(tmp-file-mgr-test)