You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:22 UTC

[11/11] incubator-impala git commit: IMPALA-4674: Part 2: port backend exec to BufferPool

IMPALA-4674: Part 2: port backend exec to BufferPool

Always create global BufferPool at startup using 80% of memory and
limit reservations to 80% of query memory (same as BufferedBlockMgr).
The query's initial reservation is computed in the planner, claimed
centrally (managed by the InitialReservations class) and distributed
to query operators from there.

min_spillable_buffer_size and default_spillable_buffer_size query
options control the buffer size that the planner selects for
spilling operators.

Port ExecNodes to use BufferPool:
  * Each ExecNode has to claim its reservation during Open()
  * Port Sorter to use BufferPool.
  * Switch from BufferedTupleStream to BufferedTupleStreamV2
  * Port HashTable to use BufferPool via a Suballocator.

This also makes PAGG memory consumption more efficient (avoid wasting buffers)
and improve the spilling algorithm:
* Allow preaggs to execute with 0 reservation - if streams and hash tables
  cannot be allocated, it will pass through rows.
* Halve the buffer requirement for spilling aggs - avoid allocating
  buffers for aggregated and unaggregated streams simultaneously.
* Rebuild spilled partitions instead of repartitioning (IMPALA-2708)

TODO in follow-up patches:
* Rename BufferedTupleStreamV2 to BufferedTupleStream
* Implement max_row_size query option.

Testing:
* Updated tests to reflect new memory requirements

Change-Id: I7fc7fe1c04e9dfb1a0c749fb56a5e0f2bf9c6c3e
Reviewed-on: http://gerrit.cloudera.org:8080/5801
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a98b90bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a98b90bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a98b90bd

Branch: refs/heads/master
Commit: a98b90bd3877886e97dc2385cfdf5e3f95245533
Parents: d5b0c6b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 16 16:09:36 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Aug 5 01:03:02 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |    2 +-
 be/src/exec/analytic-eval-node.cc               |   53 +-
 be/src/exec/analytic-eval-node.h                |   25 +-
 be/src/exec/exec-node.cc                        |   45 +-
 be/src/exec/exec-node.h                         |   17 +
 be/src/exec/hash-table-test.cc                  |  251 ++-
 be/src/exec/hash-table.cc                       |  149 +-
 be/src/exec/hash-table.h                        |  140 +-
 be/src/exec/hash-table.inline.h                 |   20 +-
 be/src/exec/nested-loop-join-builder.cc         |    3 +-
 be/src/exec/partial-sort-node.cc                |    7 +-
 be/src/exec/partial-sort-node.h                 |    1 -
 be/src/exec/partitioned-aggregation-node-ir.cc  |   20 +-
 be/src/exec/partitioned-aggregation-node.cc     |  639 ++++----
 be/src/exec/partitioned-aggregation-node.h      |  192 ++-
 be/src/exec/partitioned-hash-join-builder-ir.cc |   12 +-
 be/src/exec/partitioned-hash-join-builder.cc    |  159 +-
 be/src/exec/partitioned-hash-join-builder.h     |   76 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    |    7 +-
 be/src/exec/partitioned-hash-join-node.cc       |  136 +-
 be/src/exec/partitioned-hash-join-node.h        |   26 +-
 be/src/exec/partitioned-hash-join-node.inline.h |    2 +-
 be/src/exec/sort-node.cc                        |   15 +-
 be/src/exec/sort-node.h                         |    3 +-
 be/src/runtime/CMakeLists.txt                   |    5 +-
 be/src/runtime/buffered-block-mgr-test.cc       | 1547 ------------------
 be/src/runtime/buffered-block-mgr.cc            | 1254 --------------
 be/src/runtime/buffered-block-mgr.h             |  606 -------
 be/src/runtime/buffered-tuple-stream-test.cc    | 1264 --------------
 be/src/runtime/buffered-tuple-stream.cc         |  903 ----------
 be/src/runtime/buffered-tuple-stream.h          |  561 -------
 be/src/runtime/buffered-tuple-stream.inline.h   |   59 -
 be/src/runtime/bufferpool/buffer-pool.cc        |   12 +-
 be/src/runtime/bufferpool/buffer-pool.h         |    8 +
 be/src/runtime/bufferpool/reservation-tracker.h |    4 +
 be/src/runtime/disk-io-mgr.cc                   |    7 +-
 be/src/runtime/exec-env.cc                      |   35 +-
 be/src/runtime/exec-env.h                       |    4 +-
 be/src/runtime/fragment-instance-state.cc       |    2 -
 be/src/runtime/initial-reservations.cc          |   90 +
 be/src/runtime/initial-reservations.h           |   79 +
 be/src/runtime/query-exec-mgr.cc                |    2 +
 be/src/runtime/query-state.cc                   |   91 +-
 be/src/runtime/query-state.h                    |   51 +-
 be/src/runtime/row-batch.cc                     |   19 -
 be/src/runtime/row-batch.h                      |   13 -
 be/src/runtime/runtime-filter.h                 |    1 +
 be/src/runtime/runtime-state.cc                 |   52 +-
 be/src/runtime/runtime-state.h                  |   32 +-
 be/src/runtime/sorter.cc                        | 1058 ++++++------
 be/src/runtime/sorter.h                         |   65 +-
 be/src/runtime/test-env.cc                      |   23 +-
 be/src/runtime/test-env.h                       |    9 +-
 be/src/runtime/tmp-file-mgr-test.cc             |   10 +-
 be/src/runtime/tmp-file-mgr.h                   |   23 +-
 be/src/service/client-request-state.cc          |    4 +-
 be/src/service/query-options.cc                 |   28 +-
 be/src/service/query-options.h                  |    6 +-
 be/src/util/bloom-filter.h                      |    2 +-
 be/src/util/static-asserts.cc                   |    2 -
 common/thrift/Frontend.thrift                   |   16 +-
 common/thrift/ImpalaInternalService.thrift      |   22 +-
 common/thrift/ImpalaService.thrift              |    8 +-
 common/thrift/PlanNodes.thrift                  |   18 +
 common/thrift/generate_error_codes.py           |   10 +-
 .../org/apache/impala/common/RuntimeEnv.java    |   10 -
 .../apache/impala/planner/AggregationNode.java  |   11 +-
 .../apache/impala/planner/AnalyticEvalNode.java |    7 +-
 .../impala/planner/DataSourceScanNode.java      |    2 +-
 .../apache/impala/planner/DataStreamSink.java   |    2 +-
 .../org/apache/impala/planner/EmptySetNode.java |    2 +-
 .../org/apache/impala/planner/ExchangeNode.java |    2 +-
 .../apache/impala/planner/HBaseScanNode.java    |    2 +-
 .../apache/impala/planner/HBaseTableSink.java   |    2 +-
 .../org/apache/impala/planner/HashJoinNode.java |    8 +-
 .../org/apache/impala/planner/HdfsScanNode.java |    4 +-
 .../apache/impala/planner/HdfsTableSink.java    |    2 +-
 .../apache/impala/planner/JoinBuildSink.java    |    2 +-
 .../org/apache/impala/planner/KuduScanNode.java |    2 +-
 .../apache/impala/planner/KuduTableSink.java    |    2 +-
 .../impala/planner/NestedLoopJoinNode.java      |    5 +-
 .../org/apache/impala/planner/PlanNode.java     |   14 +-
 .../org/apache/impala/planner/PlanRootSink.java |    2 +-
 .../java/org/apache/impala/planner/Planner.java |   12 +-
 .../apache/impala/planner/ResourceProfile.java  |   72 +-
 .../org/apache/impala/planner/SelectNode.java   |    2 +-
 .../impala/planner/SingularRowSrcNode.java      |    2 +-
 .../org/apache/impala/planner/SortNode.java     |   47 +-
 .../org/apache/impala/planner/SubplanNode.java  |    2 +-
 .../org/apache/impala/planner/UnionNode.java    |    2 +-
 .../org/apache/impala/planner/UnnestNode.java   |    2 +-
 .../org/apache/impala/service/Frontend.java     |    2 +-
 .../org/apache/impala/planner/PlannerTest.java  |    2 -
 .../queries/PlannerTest/constant-folding.test   |   32 +-
 .../queries/PlannerTest/disable-codegen.test    |    2 +-
 .../PlannerTest/fk-pk-join-detection.test       |   52 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   30 +-
 .../queries/PlannerTest/parquet-filtering.test  |    6 +-
 .../PlannerTest/resource-requirements.test      |  418 ++---
 .../PlannerTest/sort-expr-materialization.test  |   30 +-
 .../PlannerTest/spillable-buffer-sizing.test    |  112 +-
 .../queries/PlannerTest/tablesample.test        |    4 +-
 .../queries/QueryTest/analytic-fns.test         |   12 +-
 .../queries/QueryTest/explain-level0.test       |    2 +-
 .../queries/QueryTest/explain-level1.test       |    2 +-
 .../queries/QueryTest/explain-level2.test       |    6 +-
 .../queries/QueryTest/explain-level3.test       |    6 +-
 .../queries/QueryTest/nested-types-tpch.test    |   24 +-
 .../QueryTest/runtime_row_filters_phj.test      |    5 +-
 ...ingle-node-joins-with-limits-exhaustive.test |    2 +-
 .../QueryTest/single-node-large-sorts.test      |    2 +-
 .../queries/QueryTest/spilling.test             |   87 +-
 .../targeted-stress/queries/agg_stress.test     |    2 +-
 .../workloads/tpch/queries/insert_parquet.test  |    2 +
 tests/comparison/discrepancy_searcher.py        |    4 +-
 tests/custom_cluster/test_scratch_disk.py       |   12 +-
 tests/custom_cluster/test_spilling.py           |   47 -
 tests/query_test/test_cancellation.py           |   10 +-
 tests/query_test/test_mem_usage_scaling.py      |   31 +-
 tests/query_test/test_nested_types.py           |    1 -
 tests/query_test/test_scratch_limit.py          |   12 +-
 tests/query_test/test_sort.py                   |   26 +-
 tests/query_test/test_spilling.py               |   39 +
 123 files changed, 2885 insertions(+), 8366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 94dc86a..be4be80 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -119,7 +119,7 @@ ir_functions = [
   ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN",
    "_ZN6impala23PartitionedHashJoinNode17ProcessProbeBatchILi8EEEiNS_13TPrefetchMode4typeEPNS_8RowBatchEPNS_12HashTableCtxEPNS_6StatusE"],
   ["PHJ_INSERT_BATCH",
-   "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorINS_19BufferedTupleStream6RowIdxESaISA_EE"],
+   "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorIPhSaIS9_EEPNS_6StatusE"],
   ["HASH_TABLE_GET_HASH_SEED",
    "_ZNK6impala12HashTableCtx11GetHashSeedEv"],
   ["HASH_TABLE_GET_BUILD_EXPR_EVALUATORS",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index b789188..f6d96ae 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -23,9 +23,10 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "udf/udf-internal.h"
@@ -34,13 +35,14 @@
 #include "common/names.h"
 
 static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
+static const int MIN_REQUIRED_BUFFERS = 2;
 
 using namespace strings;
 
 namespace impala {
 
-AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
+AnalyticEvalNode::AnalyticEvalNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs),
     window_(tnode.analytic_node.window),
     intermediate_tuple_desc_(
@@ -51,7 +53,6 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
     rows_end_offset_(0),
     has_first_val_null_offset_(false),
     first_val_null_offset_(0),
-    client_(nullptr),
     child_tuple_cmp_row_(nullptr),
     last_result_idx_(-1),
     prev_pool_last_result_idx_(-1),
@@ -110,6 +111,7 @@ AnalyticEvalNode::~AnalyticEvalNode() {
 Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK_EQ(conjunct_evals_.size(), 0);
+  state_ = state;
   const TAnalyticNode& analytic_node = tnode.analytic_node;
   bool has_lead_fn = false;
 
@@ -154,6 +156,8 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   DCHECK(child(0)->row_desc()->IsPrefixOf(*row_desc()));
+  DCHECK_GE(resource_profile_.min_reservation,
+      resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS);
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
@@ -175,12 +179,6 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
         fn_pool_.get(), &order_by_eq_expr_eval_));
     AddEvaluatorToFree(order_by_eq_expr_eval_);
   }
-
-  // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe.
-  const int MIN_REQUIRED_BUFFERS = 2;
-  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
-      Substitute("AnalyticEvalNode id=$0 ptr=$1", id_, this),
-      MIN_REQUIRED_BUFFERS, false, mem_tracker(), state, &client_));
   return Status::OK();
 }
 
@@ -190,22 +188,20 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
-  DCHECK(client_ != nullptr);
-  DCHECK(input_stream_ == nullptr);
-  input_stream_.reset(
-      new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_,
-          false /* use_initial_small_buffers */, true /* read_write */));
-  RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
-  bool got_write_buffer;
-  RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer));
-  if (!got_write_buffer) {
-    return state->block_mgr()->MemLimitTooLowError(client_, id());
-  }
-  bool got_read_buffer;
-  RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
-  if (!got_read_buffer) {
-    return state->block_mgr()->MemLimitTooLowError(client_, id());
+
+  // Claim reservation after the child has been opened to reduce the peak reservation
+  // requirement.
+  if (!buffer_pool_client_.is_registered()) {
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
+  DCHECK(input_stream_ == nullptr);
+  input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(),
+      &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+      resource_profile_.spillable_buffer_size));
+  RETURN_IF_ERROR(input_stream_->Init(id(), true));
+  bool success;
+  RETURN_IF_ERROR(input_stream_->PrepareForReadWrite(true, &success));
+  DCHECK(success) << "Had reservation: " << buffer_pool_client_.DebugString();
 
   for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
     RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state));
@@ -366,8 +362,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     // the stream and continue writing/reading in unpinned mode.
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
-    RETURN_IF_ERROR(
-        input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
+    input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.
@@ -627,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
             << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
   SCOPED_TIMER(evaluation_timer_);
 
-  // BufferedTupleStream::num_rows() returns the total number of rows that have been
+  // BufferedTupleStreamV2::num_rows() returns the total number of rows that have been
   // inserted into the stream (it does not decrease when we read rows), so the index of
   // the next input row that will be inserted will be the current size of the stream.
   int64_t stream_idx = input_stream_->num_rows();
@@ -857,7 +853,6 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
 
 void AnalyticEvalNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (client_ != nullptr) state->block_mgr()->ClearReservations(client_);
   // We may need to clean up input_stream_ if an error occurred at some point.
   if (input_stream_ != nullptr) {
     input_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 89c5cf3..eab9198 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -19,8 +19,7 @@
 #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 
 #include "exec/exec-node.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream-v2.h"
 #include "runtime/tuple.h"
 
 namespace impala {
@@ -189,6 +188,10 @@ class AnalyticEvalNode : public ExecNode {
   /// Debug string containing the window definition.
   std::string DebugWindowString() const;
 
+  /// The RuntimeState for the fragment instance containing this AnalyticEvalNode. Set
+  /// in Init().
+  RuntimeState* state_;
+
   /// Window over which the analytic functions are evaluated. Only used if fn_scope_
   /// is ROWS or RANGE.
   /// TODO: fn_scope_ and window_ are candidates to be removed during codegen
@@ -254,9 +257,6 @@ class AnalyticEvalNode : public ExecNode {
   boost::scoped_ptr<MemPool> curr_tuple_pool_;
   boost::scoped_ptr<MemPool> prev_tuple_pool_;
 
-  /// Block manager client used by input_stream_. Not owned.
-  BufferedBlockMgr::Client* client_ = nullptr;
-
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -330,15 +330,16 @@ class AnalyticEvalNode : public ExecNode {
 
   /// Buffers input rows added in ProcessChildBatch() until enough rows are able to
   /// be returned by GetNextOutputBatch(), in which case row batches are returned from
-  /// the front of the stream and the underlying buffered blocks are deleted once read.
+  /// the front of the stream and the underlying buffers are deleted once read.
   /// The number of rows that must be buffered may vary from an entire partition (e.g.
-  /// no order by clause) to a single row (e.g. ROWS windows). When the amount of
-  /// buffered data exceeds the available memory in the underlying BufferedBlockMgr,
-  /// input_stream_ is unpinned (i.e., possibly spilled to disk if necessary).
-  /// The input stream owns tuple data backing rows returned in GetNext(). The blocks
-  /// with tuple data are attached to an output row batch on eos or ReachedLimit().
+  /// no order by clause) to a single row (e.g. ROWS windows). If the amount of buffered
+  /// data in 'input_stream_' exceeds the ExecNode's buffer reservation and the stream
+  /// cannot increase the reservation, then 'input_stream_' is unpinned (i.e., spilled to
+  /// disk). The input stream owns tuple data backing rows returned in GetNext(). The
+  /// buffers with tuple data are attached to an output row batch on eos or
+  /// ReachedLimit().
   /// TODO: Consider re-pinning unpinned streams when possible.
-  boost::scoped_ptr<BufferedTupleStream> input_stream_;
+  boost::scoped_ptr<BufferedTupleStreamV2> input_stream_;
 
   /// Pool used for O(1) allocations that live until Close() or Reset().
   /// Does not own data backing tuples returned in GetNext(), so it does not

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c3d9946..61c8d40 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -34,10 +34,10 @@
 #include "exec/empty-set-node.h"
 #include "exec/exchange-node.h"
 #include "exec/hbase-scan-node.h"
-#include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
-#include "exec/kudu-scan-node.h"
+#include "exec/hdfs-scan-node.h"
 #include "exec/kudu-scan-node-mt.h"
+#include "exec/kudu-scan-node.h"
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
 #include "exec/partial-sort-node.h"
@@ -50,9 +50,14 @@
 #include "exec/topn-node.h"
 #include "exec/union-node.h"
 #include "exec/unnest-node.h"
+#include "exprs/expr.h"
+#include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
-#include "runtime/mem-tracker.h"
+#include "runtime/exec-env.h"
+#include "runtime/initial-reservations.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
@@ -61,7 +66,10 @@
 #include "common/names.h"
 
 using namespace llvm;
+using strings::Substitute;
 
+DECLARE_int32(be_port);
+DECLARE_string(hostname);
 DEFINE_bool(enable_partitioned_hash_join, true, "Deprecated - has no effect");
 DEFINE_bool(enable_partitioned_aggregation, true, "Deprecated - has no effect");
 
@@ -116,6 +124,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     type_(tnode.node_type),
     pool_(pool),
     row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples),
+    resource_profile_(tnode.resource_profile),
     debug_phase_(TExecNodePhase::INVALID),
     debug_action_(TDebugAction::WAIT),
     limit_(tnode.limit),
@@ -195,7 +204,12 @@ void ExecNode::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(conjunct_evals_, state);
   ScalarExpr::Close(conjuncts_);
   if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
-
+  if (buffer_pool_client_.is_registered()) {
+    VLOG_FILE << id_ << " returning reservation " << resource_profile_.min_reservation;
+    state->query_state()->initial_reservations()->Return(
+        &buffer_pool_client_, resource_profile_.min_reservation);
+    state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+  }
   if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) {
     LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl
                  << state->instance_mem_tracker()->LogUsage();
@@ -204,6 +218,29 @@ void ExecNode::Close(RuntimeState* state) {
   }
 }
 
+Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
+  DCHECK(!buffer_pool_client_.is_registered());
+  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+  // Check the minimum buffer size in case the minimum buffer size used by the planner
+  // doesn't match this backend's.
+  if (resource_profile_.__isset.spillable_buffer_size &&
+      resource_profile_.spillable_buffer_size < buffer_pool->min_buffer_len()) {
+    return Status(Substitute("Spillable buffer size for node $0 of $1 bytes is less "
+                             "than the minimum buffer pool buffer size of $2 bytes",
+        id_, resource_profile_.spillable_buffer_size, buffer_pool->min_buffer_len()));
+  }
+
+  RETURN_IF_ERROR(buffer_pool->RegisterClient(
+      Substitute("$0 id=$1 ptr=$2", PrintPlanNodeType(type_), id_, this),
+      state->query_state()->file_group(), state->instance_buffer_reservation(),
+      mem_tracker(), resource_profile_.max_reservation, runtime_profile(),
+      &buffer_pool_client_));
+  VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation;
+  state->query_state()->initial_reservations()->Claim(
+      &buffer_pool_client_, resource_profile_.min_reservation);
+  return Status::OK();
+}
+
 Status ExecNode::CreateTree(
     RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
   if (plan.nodes.size() == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index a107f62..60efff0 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -26,6 +26,8 @@
 #include "common/status.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/PlanNodes_types.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "util/blocking-queue.h"
 #include "util/runtime-profile.h"
@@ -227,6 +229,12 @@ class ExecNode {
  protected:
   friend class DataSink;
 
+  /// Initialize 'buffer_pool_client_' and claim the initial reservation for this
+  /// ExecNode. Only needs to be called by ExecNodes that will use the client.
+  /// The client is automatically cleaned up in Close(). Should not be called if
+  /// the client is already open.
+  Status ClaimBufferReservation(RuntimeState* state);
+
   /// Extends blocking queue for row batches. Row batches have a property that
   /// they must be processed in the order they were produced, even in cancellation
   /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
@@ -276,6 +284,9 @@ class ExecNode {
   std::vector<ExecNode*> children_;
   RowDescriptor row_descriptor_;
 
+  /// Resource information sent from the frontend.
+  const TBackendResourceProfile resource_profile_;
+
   /// debug-only: if debug_action_ is not INVALID, node will perform action in
   /// debug_phase_
   TExecNodePhase::type debug_phase_;
@@ -298,6 +309,12 @@ class ExecNode {
   /// Created in Prepare().
   boost::scoped_ptr<MemPool> expr_mem_pool_;
 
+  /// Buffer pool client for this node. Initialized with the node's minimum reservation
+  /// in ClaimBufferReservation(). After initialization, the client must hold onto at
+  /// least the minimum reservation so that it can be returned to the initial
+  /// reservations pool in Close().
+  BufferPool::ClientHandle buffer_pool_client_;
+
   bool is_closed() const { return is_closed_; }
 
   /// Pointer to the containing SubplanNode or NULL if not inside a subplan.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 42bc7e1..7a6ec9d 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -17,24 +17,27 @@
 
 #include <boost/scoped_ptr.hpp>
 
-#include <stdlib.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <iostream>
+#include <limits>
 #include <vector>
 
-#include "testutil/gtest-util.h"
 #include "common/compiler-util.h"
 #include "common/init.h"
 #include "exec/hash-table.inline.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "runtime/test-env.h"
 #include "runtime/tuple-row.h"
 #include "service/fe-support.h"
+#include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
 #include "util/runtime-profile-counters.h"
 #include "util/test-info.h"
@@ -51,9 +54,16 @@ class HashTableTest : public testing::Test {
   HashTableTest() : mem_pool_(&tracker_) {}
 
  protected:
+  /// Temporary runtime environment for the hash table.
   scoped_ptr<TestEnv> test_env_;
   RuntimeState* runtime_state_;
+
+  /// Hash tables and associated clients - automatically closed in TearDown().
+  vector<BufferPool::ClientHandle*> clients_;
+  vector<HashTable*> hash_tables_;
+
   ObjectPool pool_;
+  /// A dummy MemTracker used for exprs and other things we don't need to have limits on.
   MemTracker tracker_;
   MemPool mem_pool_;
   vector<ScalarExpr*> build_exprs_;
@@ -83,6 +93,8 @@ class HashTableTest : public testing::Test {
     ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
         &probe_expr_evals_));
     ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
+
+    CreateTestEnv();
   }
 
   virtual void TearDown() {
@@ -90,9 +102,34 @@ class HashTableTest : public testing::Test {
     ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
     ScalarExpr::Close(build_exprs_);
     ScalarExpr::Close(probe_exprs_);
+
+    for (HashTable* hash_table : hash_tables_) hash_table->Close();
+    hash_tables_.clear();
+
+    for (BufferPool::ClientHandle* client : clients_) {
+      test_env_->exec_env()->buffer_pool()->DeregisterClient(client);
+    }
+    clients_.clear();
+
     runtime_state_ = nullptr;
     test_env_.reset();
     mem_pool_.FreeAll();
+    pool_.Clear();
+  }
+
+  /// Initialize test_env_ and runtime_state_ with the given page size and capacity
+  /// for the given number of pages. If test_env_ was already created, then re-creates it.
+  void CreateTestEnv(int64_t min_page_size = 64 * 1024,
+      int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024) {
+    test_env_.reset(new TestEnv());
+    test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+    ASSERT_OK(test_env_->Init());
+
+    TQueryOptions query_options;
+    query_options.__set_default_spillable_buffer_size(min_page_size);
+    query_options.__set_min_spillable_buffer_size(min_page_size);
+    query_options.__set_buffer_pool_limit(buffer_bytes_limit);
+    ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
   }
 
   TupleRow* CreateTupleRow(int32_t val) {
@@ -116,8 +153,9 @@ class HashTableTest : public testing::Test {
 
   // Wrapper to call private methods on HashTable
   // TODO: understand google testing, there must be a more natural way to do this
-  void ResizeTable(HashTable* table, int64_t new_size, HashTableCtx* ht_ctx) {
-    table->ResizeBuckets(new_size, ht_ctx);
+  Status ResizeTable(
+      HashTable* table, int64_t new_size, HashTableCtx* ht_ctx, bool* success) {
+    return table->ResizeBuckets(new_size, ht_ctx, success);
   }
 
   // Do a full table scan on table.  All values should be between [min,max).  If
@@ -188,24 +226,41 @@ class HashTableTest : public testing::Test {
     }
   }
 
-  // Construct hash table with custom block manager. Returns result of HashTable::Init()
-  bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
-      scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
-      int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
-        next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
+  /// Construct hash table and buffer pool client.
+  /// Returns true if HashTable::Init() was successful. Created objects
+  /// and resources (e.g. reservations) are automatically freed in TearDown().
+  bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, HashTable** table,
+      int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
+      int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
+    BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
+    RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht"));
+
+    // Set up memory tracking for the hash table.
     MemTracker* client_tracker =
         pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
-    BufferedBlockMgr::Client* client;
-    EXPECT_OK(runtime_state_->block_mgr()->RegisterClient(
-        "", reserved_blocks, false, client_tracker, runtime_state_, &client));
+    int64_t initial_reservation_bytes = block_size * initial_reserved_blocks;
+    int64_t max_reservation_bytes = block_size * max_num_blocks;
+
+    // Set up the memory allocator.
+    BufferPool::ClientHandle* client = pool_.Add(new BufferPool::ClientHandle);
+    clients_.push_back(client);
+    EXPECT_OK(buffer_pool->RegisterClient("", nullptr,
+        runtime_state_->instance_buffer_reservation(), client_tracker,
+        max_reservation_bytes, profile, client));
+    EXPECT_TRUE(client->IncreaseReservation(initial_reservation_bytes));
+    Suballocator* allocator =
+        pool_.Add(new Suballocator(buffer_pool, client, suballocator_buffer_len));
 
     // Initial_num_buckets must be a power of two.
     EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
     int64_t max_num_buckets = 1L << 31;
-    table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, nullptr,
-          max_num_buckets, initial_num_buckets));
-    return (*table)->Init();
+    *table = pool_.Add(new HashTable(
+        quadratic, allocator, true, 1, nullptr, max_num_buckets, initial_num_buckets));
+    hash_tables_.push_back(*table);
+    bool success;
+    Status status = (*table)->Init(&success);
+    EXPECT_OK(status);
+    return status.ok() && success;
   }
 
   // Constructs and closes a hash table.
@@ -229,14 +284,12 @@ class HashTableTest : public testing::Test {
     EXPECT_EQ(*val_row4, 4);
 
     // Create and close the hash table.
-    scoped_ptr<HashTable> hash_table;
+    HashTable* hash_table;
     bool initialized = CreateHashTable(quadratic, initial_num_buckets, &hash_table);
     EXPECT_EQ(too_big, !initialized);
     if (initialized && initial_num_buckets > 0) {
       EXPECT_NE(hash_table->ByteSize(), 0);
     }
-
-    hash_table->Close();
   }
 
   // IMPALA-2897: Build rows that are equivalent (where nullptrs are counted as equivalent)
@@ -246,7 +299,7 @@ class HashTableTest : public testing::Test {
     for (int i = 0; i < 2; ++i) build_rows[i] = CreateNullTupleRow();
 
     // Create the hash table and insert the build rows
-    scoped_ptr<HashTable> hash_table;
+    HashTable* hash_table;
     ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
     scoped_ptr<HashTableCtx> ht_ctx;
     EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_,
@@ -256,13 +309,15 @@ class HashTableTest : public testing::Test {
 
     for (int i = 0; i < 2; ++i) {
       if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
-      BufferedTupleStream::RowIdx dummy_row_idx;
+      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
+      Status status;
+      bool inserted =
+          hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
       EXPECT_TRUE(inserted);
+      ASSERT_OK(status);
     }
     EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
-    hash_table->Close();
     ht_ctx->Close(runtime_state_);
   }
 
@@ -282,7 +337,7 @@ class HashTableTest : public testing::Test {
     }
 
     // Create the hash table and insert the build rows
-    scoped_ptr<HashTable> hash_table;
+    HashTable* hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
@@ -290,52 +345,57 @@ class HashTableTest : public testing::Test {
         vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     EXPECT_OK(ht_ctx->Open(runtime_state_));
-    bool success = hash_table->CheckAndResize(5, ht_ctx.get());
+    bool success;
+    EXPECT_OK(hash_table->CheckAndResize(5, ht_ctx.get(), &success));
     ASSERT_TRUE(success);
     for (int i = 0; i < 5; ++i) {
       if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
-      BufferedTupleStream::RowIdx dummy_row_idx;
+      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
+      bool inserted =
+          hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
       EXPECT_TRUE(inserted);
+      ASSERT_OK(status);
     }
     EXPECT_EQ(hash_table->size(), 5);
 
     // Do a full table scan and validate returned pointers
-    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+    FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
 
     // Double the size of the hash table and scan again.
-    ResizeTable(hash_table.get(), 2048, ht_ctx.get());
+    EXPECT_OK(ResizeTable(hash_table, 2048, ht_ctx.get(), &success));
+    EXPECT_TRUE(success);
     EXPECT_EQ(hash_table->num_buckets(), 2048);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+    FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
 
     // Try to shrink and scan again.
-    ResizeTable(hash_table.get(), 64, ht_ctx.get());
+    EXPECT_OK(ResizeTable(hash_table, 64, ht_ctx.get(), &success));
+    EXPECT_TRUE(success);
     EXPECT_EQ(hash_table->num_buckets(), 64);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+    FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
 
     // Resize to 8, which is the smallest value to fit the number of filled buckets.
-    ResizeTable(hash_table.get(), 8, ht_ctx.get());
+    EXPECT_OK(ResizeTable(hash_table, 8, ht_ctx.get(), &success));
+    EXPECT_TRUE(success);
     EXPECT_EQ(hash_table->num_buckets(), 8);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+    FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
 
-    hash_table->Close();
     ht_ctx->Close(runtime_state_);
   }
 
-  void ScanTest(bool quadratic, int initial_size, int rows_to_insert,
-                int additional_rows) {
-    scoped_ptr<HashTable> hash_table;
+  void ScanTest(
+      bool quadratic, int initial_size, int rows_to_insert, int additional_rows) {
+    HashTable* hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
 
     int total_rows = rows_to_insert + additional_rows;
@@ -347,19 +407,21 @@ class HashTableTest : public testing::Test {
     EXPECT_OK(ht_ctx->Open(runtime_state_));
 
     // Add 1 row with val 1, 2 with val 2, etc.
+    bool success;
     vector<TupleRow*> build_rows;
     ProbeTestData* probe_rows = new ProbeTestData[total_rows];
     probe_rows[0].probe_row = CreateTupleRow(0);
     for (int val = 1; val <= rows_to_insert; ++val) {
-      bool success = hash_table->CheckAndResize(val, ht_ctx.get());
+      EXPECT_OK(hash_table->CheckAndResize(val, ht_ctx.get(), &success));
       EXPECT_TRUE(success) << " failed to resize: " << val;
       probe_rows[val].probe_row = CreateTupleRow(val);
       for (int i = 0; i < val; ++i) {
         TupleRow* row = CreateTupleRow(val);
         if (!ht_ctx->EvalAndHashBuild(row)) continue;
-        BufferedTupleStream::RowIdx dummy_row_idx;
+        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+        ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status));
+        ASSERT_OK(status);
         build_rows.push_back(row);
         probe_rows[val].expected_build_rows.push_back(row);
       }
@@ -371,21 +433,22 @@ class HashTableTest : public testing::Test {
     }
 
     // Test that all the builds were found.
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
 
     // Resize and try again.
     int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
-    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
+    EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
+    EXPECT_TRUE(success);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
 
     target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
-    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
+    EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
+    EXPECT_TRUE(success);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+    ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
 
     delete [] probe_rows;
-    hash_table->Close();
     ht_ctx->Close(runtime_state_);
   }
 
@@ -395,9 +458,11 @@ class HashTableTest : public testing::Test {
     uint64_t num_to_add = 4;
     int expected_size = 0;
 
-    MemTracker tracker(100 * 1024 * 1024);
-    scoped_ptr<HashTable> hash_table;
-    ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table));
+    // Need enough memory for two hash table bucket directories during resize.
+    const int64_t mem_limit_mb = 128 + 64;
+    HashTable* hash_table;
+    ASSERT_TRUE(
+        CreateHashTable(quadratic, num_to_add, &hash_table, 1024 * 1024, mem_limit_mb));
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
@@ -408,27 +473,32 @@ class HashTableTest : public testing::Test {
     // entries. When num_to_add == 4, then the total number of inserts is 4194300.
     int build_row_val = 0;
     for (int i = 0; i < 20; ++i) {
-      // Currently the mem used for the bucket is not being tracked by the mem tracker.
-      // Thus the resize is expected to be successful.
-      // TODO: Keep track of the mem used for the buckets and test cases where we actually
-      // hit OOM.
-      // TODO: Insert duplicates to also hit OOM.
-      bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get());
-      EXPECT_TRUE(success) << " failed to resize: " << num_to_add;
+      bool success;
+      EXPECT_OK(hash_table->CheckAndResize(num_to_add, ht_ctx.get(), &success));
+      EXPECT_TRUE(success) << " failed to resize: " << num_to_add << "\n"
+                           << tracker_.LogUsage() << "\n"
+                           << clients_.back()->DebugString();
       for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
         TupleRow* row = CreateTupleRow(build_row_val);
         if (!ht_ctx->EvalAndHashBuild(row)) continue;
-        BufferedTupleStream::RowIdx dummy_row_idx;
+        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
+        ASSERT_OK(status);
         if (!inserted) goto done_inserting;
       }
       expected_size += num_to_add;
       num_to_add *= 2;
     }
- done_inserting:
-    EXPECT_FALSE(tracker.LimitExceeded());
+  done_inserting:
     EXPECT_EQ(hash_table->size(), 4194300);
+
+    // The next allocation should put us over the limit, since we'll need 128MB for
+    // the old buckets and 256MB for the new buckets.
+    bool success;
+    EXPECT_OK(hash_table->CheckAndResize(num_to_add * 2, ht_ctx.get(), &success));
+    EXPECT_FALSE(success);
+
     // Validate that we can find the entries before we went over the limit
     for (int i = 0; i < expected_size * 5; i += 100000) {
       TupleRow* probe_row = CreateTupleRow(i);
@@ -441,7 +511,34 @@ class HashTableTest : public testing::Test {
         EXPECT_TRUE(iter.AtEnd()) << " i: " << i;
       }
     }
-    hash_table->Close();
+
+    // Insert duplicates to also hit OOM.
+    int64_t num_duplicates_inserted = 0;
+    const int DUPLICATE_VAL = 1234;
+    while (true) {
+      TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
+      if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue;
+      BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+      bool inserted =
+          hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status);
+      ASSERT_OK(status);
+      if (!inserted) break;
+      ++num_duplicates_inserted;
+    }
+
+    // Check that the duplicates that we successfully inserted are all present.
+    TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
+    ASSERT_TRUE(ht_ctx->EvalAndHashProbe(duplicate_row));
+    HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
+    ValidateMatch(duplicate_row, iter.GetRow());
+    for (int64_t i = 0; i < num_duplicates_inserted; ++i) {
+      ASSERT_FALSE(iter.AtEnd());
+      iter.NextDuplicate();
+      ValidateMatch(duplicate_row, iter.GetRow());
+    }
+    iter.NextDuplicate();
+    EXPECT_TRUE(iter.AtEnd());
+
     ht_ctx->Close(runtime_state_);
   }
 
@@ -450,7 +547,7 @@ class HashTableTest : public testing::Test {
   // enough space in the hash table (it is also expected to be slow). It also expects that
   // a probe for a N+1 element will return BUCKET_NOT_FOUND.
   void InsertFullTest(bool quadratic, int table_size) {
-    scoped_ptr<HashTable> hash_table;
+    HashTable* hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
     EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
     scoped_ptr<HashTableCtx> ht_ctx;
@@ -472,10 +569,11 @@ class HashTableTest : public testing::Test {
 
       // Insert using both Insert() and FindBucket() methods.
       if (build_row_val % 2 == 0) {
-        BufferedTupleStream::RowIdx dummy_row_idx;
+        BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
         EXPECT_TRUE(inserted);
+        ASSERT_OK(status);
       } else {
         iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
         EXPECT_FALSE(iter.AtEnd());
@@ -511,20 +609,20 @@ class HashTableTest : public testing::Test {
     EXPECT_TRUE(iter.AtEnd());
     EXPECT_FALSE(found);
 
-    hash_table->Close();
     ht_ctx->Close(runtime_state_);
   }
 
   // This test makes sure we can tolerate the low memory case where we do not have enough
   // memory to allocate the array of buckets for the hash table.
   void VeryLowMemTest(bool quadratic) {
-    const int block_size = 2 * 1024;
+    const int64_t block_size = 2 * 1024;
     const int max_num_blocks = 1;
-    const int reserved_blocks = 0;
     const int table_size = 1024;
-    scoped_ptr<HashTable> hash_table;
-    ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size,
-          max_num_blocks, reserved_blocks));
+    CreateTestEnv(block_size, block_size * max_num_blocks);
+
+    HashTable* hash_table;
+    ASSERT_FALSE(CreateHashTable(
+        quadratic, table_size, &hash_table, block_size, max_num_blocks, 0, 1024));
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1,
@@ -532,7 +630,6 @@ class HashTableTest : public testing::Test {
     EXPECT_OK(status);
     HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
-    hash_table->Close();
     ht_ctx->Close(runtime_state_);
   }
 };
@@ -612,8 +709,6 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
-      0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_));
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
       probe_exprs_, false /* !stores_nulls_ */,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index a4856e9..aacedc2 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -26,7 +26,7 @@
 #include "exprs/slot-ref.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/runtime-state.h"
@@ -37,8 +37,17 @@
 #include "common/names.h"
 
 using namespace impala;
-using namespace llvm;
-using namespace strings;
+using llvm::APFloat;
+using llvm::ArrayRef;
+using llvm::BasicBlock;
+using llvm::ConstantFP;
+using llvm::Function;
+using llvm::LLVMContext;
+using llvm::PHINode;
+using llvm::PointerType;
+using llvm::Type;
+using llvm::Value;
+using strings::Substitute;
 
 DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table");
 
@@ -85,12 +94,6 @@ static int64_t NULL_VALUE[] = {
 static_assert(sizeof(NULL_VALUE) >= ColumnType::MAX_CHAR_LENGTH,
     "NULL_VALUE must be at least as large as the largest possible slot");
 
-// The first NUM_SMALL_BLOCKS of nodes_ are made of blocks less than the IO size (of 8MB)
-// to reduce the memory footprint of small queries. In particular, we always first use a
-// 64KB and a 512KB block before starting using IO-sized blocks.
-static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 };
-static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t);
-
 HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed,
@@ -378,21 +381,20 @@ void HashTableCtx::ExprValuesCache::ResetForRead() {
   ResetIterators();
 }
 
-const double HashTable::MAX_FILL_FACTOR = 0.75f;
+constexpr double HashTable::MAX_FILL_FACTOR;
+constexpr int64_t HashTable::DATA_PAGE_SIZE;
 
-HashTable* HashTable::Create(RuntimeState* state,
-    BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples,
-    BufferedTupleStream* tuple_stream, int64_t max_num_buckets,
+HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates,
+    int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
     int64_t initial_num_buckets) {
-  return new HashTable(FLAGS_enable_quadratic_probing, state, client, stores_duplicates,
+  return new HashTable(FLAGS_enable_quadratic_probing, allocator, stores_duplicates,
       num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
 }
 
-HashTable::HashTable(bool quadratic_probing, RuntimeState* state,
-    BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples,
-    BufferedTupleStream* stream, int64_t max_num_buckets, int64_t num_buckets)
-  : state_(state),
-    block_mgr_client_(client),
+HashTable::HashTable(bool quadratic_probing, Suballocator* allocator,
+    bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* stream,
+    int64_t max_num_buckets, int64_t num_buckets)
+  : allocator_(allocator),
     tuple_stream_(stream),
     stores_tuples_(num_build_tuples == 1),
     stores_duplicates_(stores_duplicates),
@@ -410,26 +412,23 @@ HashTable::HashTable(bool quadratic_probing, RuntimeState* state,
     has_matches_(false),
     num_probes_(0), num_failed_probes_(0), travel_length_(0), num_hash_collisions_(0),
     num_resizes_(0) {
-  DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
+  DCHECK_EQ((num_buckets & (num_buckets - 1)), 0) << "num_buckets must be a power of 2";
   DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0";
   DCHECK(stores_tuples_ || stream != NULL);
-  DCHECK(client != NULL);
 }
 
-bool HashTable::Init() {
+Status HashTable::Init(bool* got_memory) {
   int64_t buckets_byte_size = num_buckets_ * sizeof(Bucket);
-  if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, buckets_byte_size)) {
-    num_buckets_ = 0;
-    return false;
-  }
-  buckets_ = reinterpret_cast<Bucket*>(malloc(buckets_byte_size));
-  if (buckets_ == NULL) {
-    state_->block_mgr()->ReleaseMemory(block_mgr_client_, buckets_byte_size);
+  RETURN_IF_ERROR(allocator_->Allocate(buckets_byte_size, &bucket_allocation_));
+  if (bucket_allocation_ == nullptr) {
     num_buckets_ = 0;
-    return false;
+    *got_memory = false;
+    return Status::OK();
   }
+  buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data());
   memset(buckets_, 0, buckets_byte_size);
-  return true;
+  *got_memory = true;
+  return Status::OK();
 }
 
 void HashTable::Close() {
@@ -439,36 +438,39 @@ void HashTable::Close() {
   const int64_t HEAVILY_USED = 1024 * 1024;
   // TODO: These statistics should go to the runtime profile as well.
   if ((num_buckets_ > LARGE_HT) || (num_probes_ > HEAVILY_USED)) VLOG(2) << PrintStats();
-  for (int i = 0; i < data_pages_.size(); ++i) {
-    data_pages_[i]->Delete();
-  }
+  for (auto& data_page : data_pages_) allocator_->Free(move(data_page));
+  data_pages_.clear();
   if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
     ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-total_data_page_size_);
   }
-  data_pages_.clear();
-  if (buckets_ != NULL) free(buckets_);
-  state_->block_mgr()->ReleaseMemory(block_mgr_client_, num_buckets_ * sizeof(Bucket));
+  if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_));
 }
 
-bool HashTable::CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx) {
+Status HashTable::CheckAndResize(
+    uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, bool* got_memory) {
   uint64_t shift = 0;
   while (num_filled_buckets_ + buckets_to_fill >
          (num_buckets_ << shift) * MAX_FILL_FACTOR) {
-    // TODO: next prime instead of double?
     ++shift;
   }
-  if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx);
-  return true;
+  if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx, got_memory);
+  *got_memory = true;
+  return Status::OK();
 }
 
-bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
-  DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
+Status HashTable::ResizeBuckets(
+    int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory) {
+  DCHECK_EQ((num_buckets & (num_buckets - 1)), 0)
       << "num_buckets=" << num_buckets << " must be a power of 2";
-  DCHECK_GT(num_buckets, num_filled_buckets_) << "Cannot shrink the hash table to "
-      "smaller number of buckets than the number of filled buckets.";
-  VLOG(2) << "Resizing hash table from "
-          << num_buckets_ << " to " << num_buckets << " buckets.";
-  if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) return false;
+  DCHECK_GT(num_buckets, num_filled_buckets_)
+    << "Cannot shrink the hash table to smaller number of buckets than the number of "
+    << "filled buckets.";
+  VLOG(2) << "Resizing hash table from " << num_buckets_ << " to " << num_buckets
+          << " buckets.";
+  if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) {
+    *got_memory = false;
+    return Status::OK();
+  }
   ++num_resizes_;
 
   // All memory that can grow proportional to the input should come from the block mgrs
@@ -476,14 +478,16 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
   // Note that while we copying over the contents of the old hash table, we need to have
   // allocated both the old and the new hash table. Once we finish, we return the memory
   // of the old hash table.
-  int64_t old_size = num_buckets_ * sizeof(Bucket);
+  // int64_t old_size = num_buckets_ * sizeof(Bucket);
   int64_t new_size = num_buckets * sizeof(Bucket);
-  if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, new_size)) return false;
-  Bucket* new_buckets = reinterpret_cast<Bucket*>(malloc(new_size));
-  if (new_buckets == NULL) {
-    state_->block_mgr()->ReleaseMemory(block_mgr_client_, new_size);
-    return false;
+
+  unique_ptr<Suballocation> new_allocation;
+  RETURN_IF_ERROR(allocator_->Allocate(new_size, &new_allocation));
+  if (new_allocation == NULL) {
+    *got_memory = false;
+    return Status::OK();
   }
+  Bucket* new_buckets = reinterpret_cast<Bucket*>(new_allocation->data());
   memset(new_buckets, 0, new_size);
 
   // Walk the old table and copy all the filled buckets to the new (resized) table.
@@ -503,28 +507,22 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
   }
 
   num_buckets_ = num_buckets;
-  free(buckets_);
-  buckets_ = new_buckets;
-  state_->block_mgr()->ReleaseMemory(block_mgr_client_, old_size);
-  return true;
+  allocator_->Free(move(bucket_allocation_));
+  bucket_allocation_ = move(new_allocation);
+  buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data());
+  *got_memory = true;
+  return Status::OK();
 }
 
-bool HashTable::GrowNodeArray() {
-  int64_t page_size = 0;
-  page_size = state_->block_mgr()->max_block_size();
-  if (data_pages_.size() < NUM_SMALL_DATA_PAGES) {
-    page_size = min(page_size, INITIAL_DATA_PAGE_SIZES[data_pages_.size()]);
-  }
-  BufferedBlockMgr::Block* block = NULL;
-  Status status = state_->block_mgr()->GetNewBlock(
-      block_mgr_client_, NULL, &block, page_size);
-  DCHECK(status.ok() || block == NULL);
-  if (block == NULL) return false;
-  data_pages_.push_back(block);
-  next_node_ = block->Allocate<DuplicateNode>(page_size);
-  ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(page_size);
-  node_remaining_current_page_ = page_size / sizeof(DuplicateNode);
-  total_data_page_size_ += page_size;
+bool HashTable::GrowNodeArray(Status* status) {
+  unique_ptr<Suballocation> allocation;
+  *status = allocator_->Allocate(DATA_PAGE_SIZE, &allocation);
+  if (!status->ok() || allocation == nullptr) return false;
+  next_node_ = reinterpret_cast<DuplicateNode*>(allocation->data());
+  data_pages_.push_back(move(allocation));
+  ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(DATA_PAGE_SIZE);
+  node_remaining_current_page_ = DATA_PAGE_SIZE / sizeof(DuplicateNode);
+  total_data_page_size_ += DATA_PAGE_SIZE;
   return true;
 }
 
@@ -533,8 +531,7 @@ void HashTable::DebugStringTuple(stringstream& ss, HtData& htdata,
   if (stores_tuples_) {
     ss << "(" << htdata.tuple << ")";
   } else {
-    ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx()
-       << ", " << htdata.idx.offset() << ")";
+    ss << "(" << htdata.flat_row << ")";
   }
   if (desc != NULL) {
     Tuple* row[num_build_tuples_];

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 9ba5b04..297e619 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -15,19 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_EXEC_HASH_TABLE_H
 #define IMPALA_EXEC_HASH_TABLE_H
 
+#include <memory>
 #include <vector>
 #include <boost/cstdint.hpp>
 #include <boost/scoped_ptr.hpp>
+
 #include "codegen/impala-ir.h"
-#include "common/logging.h"
 #include "common/compiler-util.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "common/logging.h"
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/suballocator.h"
 #include "runtime/tuple-row.h"
 #include "util/bitmap.h"
 #include "util/hash-util.h"
@@ -101,7 +103,6 @@ class HashTable;
 /// Inserts().  We may want to optimize joins more heavily for Inserts() (in particular
 /// growing).
 /// TODO: Batched interface for inserts and finds.
-/// TODO: Do we need to check mem limit exceeded so often. Check once per batch?
 /// TODO: as an optimization, compute variable-length data size for the agg node.
 
 /// Control block for a hash table. This class contains the logic as well as the variables
@@ -525,13 +526,15 @@ class HashTableCtx {
 /// nodes do not contain the hash value, because all the linked nodes have the same hash
 /// value, the one in the bucket. The data is either a tuple stream index or a Tuple*.
 /// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The
-/// data allocated by the hash table comes from the BufferedBlockMgr.
+/// data allocated by the hash table comes from the BufferPool.
 class HashTable {
  private:
-
-  /// Either the row in the tuple stream or a pointer to the single tuple of this row.
+  /// Rows are represented as pointers into the BufferedTupleStream data with one
+  /// of two formats, depending on the number of tuples in the row.
   union HtData {
-    BufferedTupleStream::RowIdx idx;
+    // For rows with multiple tuples per row, a pointer to the flattened TupleRow.
+    BufferedTupleStreamV2::FlatRowPtr flat_row;
+    // For rows with one tuple per row, a pointer to the Tuple itself.
     Tuple* tuple;
   };
 
@@ -584,7 +587,7 @@ class HashTable {
 
   /// Returns a newly allocated HashTable. The probing algorithm is set by the
   /// FLAG_enable_quadratic_probing.
-  ///  - client: block mgr client to allocate data pages from.
+  ///  - allocator: allocator to allocate bucket directory and data pages from.
   ///  - stores_duplicates: true if rows with duplicate keys may be inserted into the
   ///    hash table.
   ///  - num_build_tuples: number of Tuples in the build tuple row.
@@ -596,31 +599,35 @@ class HashTable {
   ///    -1, if it unlimited.
   ///  - initial_num_buckets: number of buckets that the hash table should be initialized
   ///    with.
-  static HashTable* Create(RuntimeState* state, BufferedBlockMgr::Client* client,
-      bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream,
-      int64_t max_num_buckets, int64_t initial_num_buckets);
+  static HashTable* Create(Suballocator* allocator, bool stores_duplicates,
+      int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+      int64_t initial_num_buckets);
 
-  /// Allocates the initial bucket structure. Returns false if OOM.
-  bool Init();
+  /// Allocates the initial bucket structure. Returns a non-OK status if an error is
+  /// encountered. If an OK status is returned , 'got_memory' is set to indicate whether
+  /// enough memory for the initial buckets was allocated from the Suballocator.
+  Status Init(bool* got_memory) WARN_UNUSED_RESULT;
 
   /// Call to cleanup any resources. Must be called once.
   void Close();
 
-  /// Inserts the row to the hash table. Returns true if the insertion was successful.
-  /// Always returns true if the table has free buckets and the key is not a duplicate.
-  /// The caller is responsible for ensuring that the table has free buckets
-  /// 'idx' is the index into tuple_stream_ for this row. If the row contains more than
-  /// one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the
-  /// hash table and the caller must guarantee it stays in memory. This will not grow the
-  /// hash table. In the case that there is a need to insert a duplicate node, instead of
-  /// filling a new bucket, and there is not enough memory to insert a duplicate node,
-  /// the insert fails and this function returns false.
-  /// Used during the build phase of hash joins.
+  /// Inserts the row to the hash table. The caller is responsible for ensuring that the
+  /// table has free buckets. Returns true if the insertion was successful. Always
+  /// returns true if the table has free buckets and the key is not a duplicate. If the
+  /// key was a duplicate and memory could not be allocated for the new duplicate node,
+  /// returns false. If an error is encountered while creating a duplicate node, returns
+  /// false and sets 'status' to the error.
+  ///
+  /// 'flat_row' is a pointer to the flattened row in 'tuple_stream_' If the row contains
+  /// only one tuple, a pointer to that tuple is stored. Otherwise the 'flat_row' pointer
+  /// is stored. The 'row' is not copied by the hash table and the caller must guarantee
+  /// it stays in memory. This will not grow the hash table.
   bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
-      const BufferedTupleStream::RowIdx& idx, TupleRow* row);
+      BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row,
+      Status* status) WARN_UNUSED_RESULT;
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
-  template<const bool READ>
+  template <const bool READ>
   void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
 
   /// Returns an iterator to the bucket that matches the probe expression results that
@@ -680,12 +687,17 @@ class HashTable {
   /// Calculates the fill factor if 'buckets_to_fill' additional buckets were to be
   /// filled and resizes the hash table so that the projected fill factor is below the
   /// max fill factor.
-  /// If it returns true, then it is guaranteed at least 'rows_to_add' rows can be
-  /// inserted without need to resize.
-  bool CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx);
+  /// If 'got_memory' is true, then it is guaranteed at least 'rows_to_add' rows can be
+  /// inserted without need to resize. If there is not enough memory available to
+  /// resize the hash table, Status::OK() is returned and 'got_memory' is false. If a
+  /// another error occurs, an error status may be returned.
+  Status CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx,
+      bool* got_memory) WARN_UNUSED_RESULT;
 
   /// Returns the number of bytes allocated to the hash table from the block manager.
-  int64_t ByteSize() const { return num_buckets_ * sizeof(Bucket) + total_data_page_size_; }
+  int64_t ByteSize() const {
+    return num_buckets_ * sizeof(Bucket) + total_data_page_size_;
+  }
 
   /// Returns an iterator at the beginning of the hash table.  Advancing this iterator
   /// will traverse all elements.
@@ -792,7 +804,6 @@ class HashTable {
     TupleRow* scratch_row_;
 
     /// Current bucket idx.
-    /// TODO: Use uint32_t?
     int64_t bucket_idx_;
 
     /// Pointer to the current duplicate node.
@@ -807,9 +818,9 @@ class HashTable {
   /// of calling this constructor directly.
   ///  - quadratic_probing: set to true when the probing algorithm is quadratic, as
   ///    opposed to linear.
-  HashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr::Client* client,
-      bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream,
-      int64_t max_num_buckets, int64_t initial_num_buckets);
+  HashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates,
+      int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+      int64_t initial_num_buckets);
 
   /// Performs the probing operation according to the probing algorithm (linear or
   /// quadratic. Returns one of the following:
@@ -839,8 +850,10 @@ class HashTable {
       HashTableCtx* ht_ctx, uint32_t hash, bool* found);
 
   /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node
-  /// where the data should be inserted. Returns NULL if the insert was not successful.
-  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx);
+  /// where the data should be inserted. Returns NULL if the insert was not successful
+  /// and either sets 'status' to OK if it failed because not enough reservation was
+  /// available or the error if an error was encountered.
+  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, Status* status);
 
   /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
   /// duplicates, 'node' will be pointing to the head of the linked list of duplicates.
@@ -848,8 +861,8 @@ class HashTable {
   /// 'bucket_idx' to BUCKET_NOT_FOUND.
   void NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node);
 
-  /// Resize the hash table to 'num_buckets'. Returns false on OOM.
-  bool ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx);
+  /// Resize the hash table to 'num_buckets'. 'got_memory' is false on OOM.
+  Status ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory);
 
   /// Appends the DuplicateNode pointed by next_node_ to 'bucket' and moves the next_node_
   /// pointer to the next DuplicateNode in the page, updating the remaining node counter.
@@ -862,9 +875,10 @@ class HashTable {
   /// the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the
   /// bucket are copied to a DuplicateNode and 'data' is updated to pointing to a
   /// DuplicateNode.
-  /// Returns NULL if the node array could not grow, i.e. there was not enough memory to
-  /// allocate a new DuplicateNode.
-  DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx);
+  /// Returns NULL and sets 'status' to OK if the node array could not grow, i.e. there
+  /// was not enough memory to allocate a new DuplicateNode. Returns NULL and sets
+  /// 'status' to an error if another error was encountered.
+  DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx, Status* status);
 
   /// Resets the contents of the empty bucket with index 'bucket_idx', in preparation for
   /// an insert. Sets all the fields of the bucket other than 'data'.
@@ -877,8 +891,10 @@ class HashTable {
   /// returns the content of the first chained duplicate node of the bucket.
   TupleRow* GetRow(Bucket* bucket, TupleRow* row) const;
 
-  /// Grow the node array. Returns false on OOM.
-  bool GrowNodeArray();
+  /// Grow the node array. Returns true and sets 'status' to OK on success. Returns false
+  /// and set 'status' to OK if we can't get sufficient reservation to allocate the next
+  /// data page. Returns false and sets 'status' if another error is encountered.
+  bool GrowNodeArray(Status* status);
 
   /// Functions to be replaced by codegen to specialize the hash table.
   bool IR_NO_INLINE stores_tuples() const { return stores_tuples_; }
@@ -887,20 +903,26 @@ class HashTable {
 
   /// Load factor that will trigger growing the hash table on insert.  This is
   /// defined as the number of non-empty buckets / total_buckets
-  static const double MAX_FILL_FACTOR;
+  static constexpr double MAX_FILL_FACTOR = 0.75;
+
+  /// The size in bytes of each page of duplicate nodes. Should be large enough to fit
+  /// enough DuplicateNodes to amortise the overhead of allocating each page and low
+  /// enough to not waste excessive memory to internal fragmentation.
+  static constexpr int64_t DATA_PAGE_SIZE = 64L * 1024;
 
   RuntimeState* state_;
 
-  /// Client to allocate data pages with.
-  BufferedBlockMgr::Client* block_mgr_client_;
+  /// Suballocator to allocate data pages and hash table buckets with.
+  Suballocator* allocator_;
 
   /// Stream contains the rows referenced by the hash table. Can be NULL if the
   /// row only contains a single tuple, in which case the TupleRow indirection
   /// is removed by the hash table.
-  BufferedTupleStream* tuple_stream_;
+  BufferedTupleStreamV2* tuple_stream_;
 
-  /// Constants on how the hash table should behave. Joins and aggs have slightly
-  /// different behavior.
+  /// Constants on how the hash table should behave.
+
+  /// True if the HtData uses the Tuple* representation, or false if it uses FlatRowPtr.
   const bool stores_tuples_;
 
   /// True if duplicates may be inserted into hash table.
@@ -909,8 +931,9 @@ class HashTable {
   /// Quadratic probing enabled (as opposed to linear).
   const bool quadratic_probing_;
 
-  /// Data pages for all nodes. These are always pinned.
-  std::vector<BufferedBlockMgr::Block*> data_pages_;
+  /// Data pages for all nodes. Allocated from suballocator to reduce memory
+  /// consumption of small tables.
+  std::vector<std::unique_ptr<Suballocation>> data_pages_;
 
   /// Byte size of all buffers in data_pages_.
   int64_t total_data_page_size_;
@@ -926,8 +949,10 @@ class HashTable {
 
   const int64_t max_num_buckets_;
 
-  /// Array of all buckets. Owned by this node. Using c-style array to control
-  /// control memory footprint.
+  /// Allocation containing all buckets.
+  std::unique_ptr<Suballocation> bucket_allocation_;
+
+  /// Pointer to the 'buckets_' array from 'bucket_allocation_'.
   Bucket* buckets_;
 
   /// Total number of buckets (filled and empty).
@@ -943,9 +968,8 @@ class HashTable {
   /// Number of build tuples, used for constructing temp row* for probes.
   const int num_build_tuples_;
 
-  /// Flag used to disable spilling hash tables that already had matches in case of
-  /// right joins (IMPALA-1488).
-  /// TODO: Not fail when spilling hash tables with matches in right joins
+  /// Flag used to check that we don't lose stored matches when spilling hash tables
+  /// (IMPALA-1488).
   bool has_matches_;
 
   /// The stats below can be used for debugging perf.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index aff7c14..ce2f784 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -90,7 +90,8 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
   return Iterator::BUCKET_NOT_FOUND;
 }
 
-inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
+inline HashTable::HtData* HashTable::InsertInternal(
+    HashTableCtx* ht_ctx, Status* status) {
   ++num_probes_;
   bool found = false;
   uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
@@ -98,7 +99,7 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
   DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
   if (found) {
     // We need to insert a duplicate node, note that this may fail to allocate memory.
-    DuplicateNode* new_node = InsertDuplicateNode(bucket_idx);
+    DuplicateNode* new_node = InsertDuplicateNode(bucket_idx, status);
     if (UNLIKELY(new_node == NULL)) return NULL;
     return &new_node->htdata;
   } else {
@@ -108,14 +109,14 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
 }
 
 inline bool HashTable::Insert(HashTableCtx* ht_ctx,
-    const BufferedTupleStream::RowIdx& idx, TupleRow* row) {
-  HtData* htdata = InsertInternal(ht_ctx);
+    BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) {
+  HtData* htdata = InsertInternal(ht_ctx, status);
   // If successful insert, update the contents of the newly inserted entry with 'idx'.
   if (LIKELY(htdata != NULL)) {
     if (stores_tuples()) {
       htdata->tuple = row->GetTuple(0);
     } else {
-      htdata->idx = idx;
+      htdata->flat_row = flat_row;
     }
     return true;
   }
@@ -213,7 +214,8 @@ inline HashTable::DuplicateNode* HashTable::AppendNextNode(Bucket* bucket) {
   return next_node_++;
 }
 
-inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_idx) {
+inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(
+    int64_t bucket_idx, Status* status) {
   DCHECK_GE(bucket_idx, 0);
   DCHECK_LT(bucket_idx, num_buckets_);
   Bucket* bucket = &buckets_[bucket_idx];
@@ -222,12 +224,12 @@ inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_i
   // Allocate one duplicate node for the new data and one for the preexisting data,
   // if needed.
   while (node_remaining_current_page_ < 1 + !bucket->hasDuplicates) {
-    if (UNLIKELY(!GrowNodeArray())) return NULL;
+    if (UNLIKELY(!GrowNodeArray(status))) return NULL;
   }
   if (!bucket->hasDuplicates) {
     // This is the first duplicate in this bucket. It means that we need to convert
     // the current entry in the bucket to a node and link it from the bucket.
-    next_node_->htdata.idx = bucket->bucketData.htdata.idx;
+    next_node_->htdata.flat_row = bucket->bucketData.htdata.flat_row;
     DCHECK(!bucket->matched);
     next_node_->matched = false;
     next_node_->next = NULL;
@@ -246,7 +248,7 @@ inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(HtData& htdata, TupleRow* ro
     return reinterpret_cast<TupleRow*>(&htdata.tuple);
   } else {
     // TODO: GetTupleRow() has interpreted code that iterates over the row's descriptor.
-    tuple_stream_->GetTupleRow(htdata.idx, row);
+    tuple_stream_->GetTupleRow(htdata.flat_row, row);
     return row;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 67e6ed6..fdd94ee 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -45,8 +45,7 @@ Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) {
   build_batch->AcquireState(batch);
 
   AddBuildBatch(build_batch);
-  if (build_batch->needs_deep_copy() || build_batch->num_blocks() > 0
-      || build_batch->num_buffers() > 0) {
+  if (build_batch->needs_deep_copy() || build_batch->num_buffers() > 0) {
     // This batch and earlier batches may refer to resources passed from the child
     // that aren't owned by the row batch itself. Deep copying ensures that the row
     // batches are backed by memory owned by this node that is safe to hold on to.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 4f485d5..88b2f26 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -58,8 +58,10 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
-      mem_tracker(), runtime_profile(), state, false));
+      mem_tracker(), &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+      runtime_profile(), state, id(), false));
   RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   AddCodegenDisabledMessage(state);
   input_batch_.reset(
       new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
@@ -81,6 +83,9 @@ Status PartialSortNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
+  if (!buffer_pool_client_.is_registered()) {
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index ab4c547..d40d653 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -19,7 +19,6 @@
 #define IMPALA_EXEC_PARTIAL_SORT_NODE_H
 
 #include "exec/exec-node.h"
-#include "runtime/buffered-block-mgr.h"
 #include "runtime/sorter.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index cd5d336..126a2a5 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -21,7 +21,7 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 
@@ -46,7 +46,8 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
   // will end up to the same partition.
   // TODO: Once we have a histogram with the number of rows per partition, we will have
   // accurate resize calls.
-  RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx));
+  RETURN_IF_ERROR(
+      CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
 
   HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   const int cache_size = expr_vals_cache->capacity();
@@ -108,6 +109,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
   // so we can try again to insert the row.
   HashTable* hash_tbl = GetHashTable(partition_idx);
   Partition* dst_partition = hash_partitions_[partition_idx];
+  DCHECK(dst_partition != nullptr);
   DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
   if (hash_tbl == NULL) {
     // This partition is already spilled, just append the row.
@@ -155,24 +157,13 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__
     }
 
     // We did not have enough memory to add intermediate_tuple to the stream.
-    RETURN_IF_ERROR(SpillPartition());
+    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
     if (partition->is_spilled()) {
       return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
     }
   }
 }
 
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(Partition* __restrict__ partition,
-    TupleRow* __restrict__ row) {
-  DCHECK(!is_streaming_preagg_);
-  DCHECK(partition->is_spilled());
-  BufferedTupleStream* stream = AGGREGATED_ROWS ?
-      partition->aggregated_row_stream.get() :
-      partition->unaggregated_row_stream.get();
-  return AppendSpilledRow(stream, row);
-}
-
 Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
     TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
     HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
@@ -230,6 +221,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
   DCHECK(remaining_capacity != NULL);
   DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
   DCHECK_GE(*remaining_capacity, 0);
+  if (hash_tbl == nullptr) return false; // Hash table was not created - pass through.
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not aggregated.
   HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);