You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 19:39:51 UTC

[impala] 03/13: IMPALA-110 (part 1): Refactor ExecNode::buffer_pool_client_

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6e128ceb7cf800ec329dc48f6d13846e85a172aa
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue May 22 15:43:41 2018 -0700

    IMPALA-110 (part 1): Refactor ExecNode::buffer_pool_client_
    
    IMPALA-110 will involve refactoring PartitionedAggregationNode by
    separating out the aggregation logic into a new type called
    Aggregator, and then supporting multiple Aggregators per node to allow
    for multiple aggregation classes to be evaluated at the same time.
    
    Each Aggregator will need to have its own memory reservation to
    operate, and we can do this by giving each Aggregator its own
    BufferPool::ClientHandle instead of using the usual
    ExecNode::buffer_pool_client_.
    
    To facilitate this, this patch refactors all of the
    buffer_pool_client_ related logic into a new class,
    ReservationManager, so that eventually each Aggregator can have its
    own ReservationManager and the logic in ClaimBufferReservation(),
    ReleaseUnusedReservation(), etc. won't be duplicated.
    
    Testing:
    - Passed a full run of the core tests.
    
    Change-Id: I75f92c3f4f05adeef11a70f59e0c8ff2d19bc17a
    Reviewed-on: http://gerrit.cloudera.org:8080/10493
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/analytic-eval-node.cc           |   6 +-
 be/src/exec/exec-node.cc                    |  87 +++++-------------------
 be/src/exec/exec-node.h                     |  50 ++++++--------
 be/src/exec/hdfs-scan-node-base.cc          |   6 +-
 be/src/exec/hdfs-scan-node-mt.cc            |   4 +-
 be/src/exec/hdfs-scan-node.cc               |  10 +--
 be/src/exec/partial-sort-node.cc            |   4 +-
 be/src/exec/partitioned-aggregation-node.cc |  55 +++++++--------
 be/src/exec/partitioned-hash-join-node.cc   |  33 +++++----
 be/src/exec/sort-node.cc                    |  11 ++-
 be/src/runtime/CMakeLists.txt               |   1 +
 be/src/runtime/reservation-manager.cc       | 100 ++++++++++++++++++++++++++++
 be/src/runtime/reservation-manager.h        |  89 +++++++++++++++++++++++++
 13 files changed, 290 insertions(+), 166 deletions(-)

diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index bc22d82..c153b64 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -188,17 +188,17 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
 
   // Claim reservation after the child has been opened to reduce the peak reservation
   // requirement.
-  if (!buffer_pool_client_.is_registered()) {
+  if (!buffer_pool_client()->is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
   DCHECK(input_stream_ == nullptr);
   input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(),
-      &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+      buffer_pool_client(), resource_profile_.spillable_buffer_size,
       resource_profile_.spillable_buffer_size));
   RETURN_IF_ERROR(input_stream_->Init(id(), true));
   bool success;
   RETURN_IF_ERROR(input_stream_->PrepareForReadWrite(true, &success));
-  DCHECK(success) << "Had reservation: " << buffer_pool_client_.DebugString();
+  DCHECK(success) << "Had reservation: " << buffer_pool_client()->DebugString();
 
   for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
     RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state));
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 85d965b..504a419 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -63,7 +63,6 @@
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
-#include "util/string-parser.h"
 
 #include "common/names.h"
 
@@ -117,8 +116,6 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     pool_(pool),
     row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples),
     resource_profile_(tnode.resource_profile),
-    debug_phase_(TExecNodePhase::INVALID),
-    debug_action_(TDebugAction::WAIT),
     limit_(tnode.limit),
     num_rows_returned_(0),
     runtime_profile_(RuntimeProfile::Create(pool_,
@@ -129,6 +126,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     disable_codegen_(tnode.disable_codegen),
     is_closed_(false) {
   runtime_profile_->set_metadata(id_);
+  debug_options_.phase = TExecNodePhase::INVALID;
 }
 
 ExecNode::~ExecNode() {
@@ -159,6 +157,9 @@ Status ExecNode::Prepare(RuntimeState* state) {
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Prepare(state));
   }
+  reservation_manager_.Init(
+      Substitute("$0 id=$1 ptr=$2", PrintThriftEnum(type_), id_, this), runtime_profile_,
+      mem_tracker_.get(), resource_profile_, debug_options_);
   return Status::OK();
 }
 
@@ -199,12 +200,7 @@ void ExecNode::Close(RuntimeState* state) {
   ScalarExpr::Close(conjuncts_);
   if (expr_perm_pool() != nullptr) expr_perm_pool_->FreeAll();
   if (expr_results_pool() != nullptr) expr_results_pool_->FreeAll();
-  if (buffer_pool_client_.is_registered()) {
-    VLOG_FILE << id_ << " returning reservation " << resource_profile_.min_reservation;
-    state->query_state()->initial_reservations()->Return(
-        &buffer_pool_client_, resource_profile_.min_reservation);
-    state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
-  }
+  reservation_manager_.Close(state);
   if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
   if (mem_tracker_ != nullptr) {
     if (mem_tracker()->consumption() != 0) {
@@ -218,57 +214,6 @@ void ExecNode::Close(RuntimeState* state) {
   }
 }
 
-Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
-  DCHECK(!buffer_pool_client_.is_registered());
-  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
-  // Check the minimum buffer size in case the minimum buffer size used by the planner
-  // doesn't match this backend's.
-  if (resource_profile_.__isset.spillable_buffer_size &&
-      resource_profile_.spillable_buffer_size < buffer_pool->min_buffer_len()) {
-    return Status(Substitute("Spillable buffer size for node $0 of $1 bytes is less "
-                             "than the minimum buffer pool buffer size of $2 bytes",
-        id_, resource_profile_.spillable_buffer_size, buffer_pool->min_buffer_len()));
-  }
-
-  RETURN_IF_ERROR(buffer_pool->RegisterClient(
-      Substitute("$0 id=$1 ptr=$2", PrintThriftEnum(type_), id_, this),
-      state->query_state()->file_group(), state->instance_buffer_reservation(),
-      mem_tracker(), resource_profile_.max_reservation, runtime_profile(),
-      &buffer_pool_client_));
-  VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation;
-  state->query_state()->initial_reservations()->Claim(
-      &buffer_pool_client_, resource_profile_.min_reservation);
-  if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY &&
-      (debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ == TExecNodePhase::OPEN)) {
-    // We may not have been able to enable the debug action at the start of Prepare() or
-    // Open() because the client is not registered then. Do it now to be sure that it is
-    // effective.
-    RETURN_IF_ERROR(EnableDenyReservationDebugAction());
-  }
-  return Status::OK();
-}
-
-Status ExecNode::ReleaseUnusedReservation() {
-  return buffer_pool_client_.DecreaseReservationTo(
-      buffer_pool_client_.GetUnusedReservation(), resource_profile_.min_reservation);
-}
-
-Status ExecNode::EnableDenyReservationDebugAction() {
-  DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
-  DCHECK(buffer_pool_client_.is_registered());
-  // Parse [0.0, 1.0] probability.
-  StringParser::ParseResult parse_result;
-  double probability = StringParser::StringToFloat<double>(
-      debug_action_param_.c_str(), debug_action_param_.size(), &parse_result);
-  if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0
-      || probability > 1.0) {
-    return Status(Substitute(
-        "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_));
-  }
-  buffer_pool_client_.SetDebugDenyIncreaseReservation(probability);
-  return Status::OK();
-}
-
 Status ExecNode::CreateTree(
     RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
   if (plan.nodes.size() == 0) {
@@ -424,9 +369,7 @@ void ExecNode::SetDebugOptions(const TDebugOptions& debug_options, ExecNode* roo
   DCHECK(debug_options.__isset.phase);
   DCHECK(debug_options.__isset.action);
   if (debug_options.node_id == -1 || root->id_ == debug_options.node_id) {
-    root->debug_phase_ = debug_options.phase;
-    root->debug_action_ = debug_options.action;
-    root->debug_action_param_ = debug_options.action_param;
+    root->debug_options_ = debug_options;
   }
   for (int i = 0; i < root->children_.size(); ++i) {
     SetDebugOptions(debug_options, root->children_[i]);
@@ -461,28 +404,28 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
 }
 
 Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {
-  DCHECK_EQ(debug_phase_, phase);
-  if (debug_action_ == TDebugAction::FAIL) {
+  DCHECK_EQ(debug_options_.phase, phase);
+  if (debug_options_.action == TDebugAction::FAIL) {
     return Status(TErrorCode::INTERNAL_ERROR, "Debug Action: FAIL");
-  } else if (debug_action_ == TDebugAction::WAIT) {
+  } else if (debug_options_.action == TDebugAction::WAIT) {
     while (!state->is_cancelled()) {
       sleep(1);
     }
     return Status::CANCELLED;
-  } else if (debug_action_ == TDebugAction::INJECT_ERROR_LOG) {
+  } else if (debug_options_.action == TDebugAction::INJECT_ERROR_LOG) {
     state->LogError(
         ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: INJECT_ERROR_LOG"));
     return Status::OK();
-  } else if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) {
+  } else if (debug_options_.action == TDebugAction::MEM_LIMIT_EXCEEDED) {
     return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
   } else {
-    DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
-    // We can only enable the debug action right if the buffer pool client is registered.
+    DCHECK_EQ(debug_options_.action, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+    // We can only enable the debug action if the buffer pool client is registered.
     // If the buffer client is not registered at this point (e.g. if phase is PREPARE or
     // OPEN), then we will enable the debug action at the time when the client is
     // registered.
-    if (buffer_pool_client_.is_registered()) {
-      RETURN_IF_ERROR(EnableDenyReservationDebugAction());
+    if (reservation_manager_.buffer_pool_client()->is_registered()) {
+      RETURN_IF_ERROR(reservation_manager_.EnableDenyReservationDebugAction());
     }
   }
   return Status::OK();
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index fac8312..ad9ae10 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -29,6 +29,7 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
+#include "runtime/reservation-manager.h"
 #include "util/blocking-queue.h"
 #include "util/runtime-profile.h"
 
@@ -225,26 +226,17 @@ class ExecNode {
  protected:
   friend class DataSink;
 
-  /// Initialize 'buffer_pool_client_' and claim the initial reservation for this
-  /// ExecNode. Only needs to be called by ExecNodes that will use the client.
-  /// The client is automatically cleaned up in Close(). Should not be called if
-  /// the client is already open.
-  ///
-  /// The ExecNode must return the initial reservation to
-  /// QueryState::initial_reservations(), which is done automatically in Close() as long
-  /// as the initial reservation is not released before Close().
-  Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  /// Release any unused reservation in excess of the node's initial reservation. Returns
-  /// an error if releasing the reservation requires flushing pages to disk, and that
-  /// fails. Not thread-safe if other threads are accessing this ExecNode or
-  /// 'buffer_pool_client_'.
-  Status ReleaseUnusedReservation() WARN_UNUSED_RESULT;
-
-  /// Enable the increase reservation denial probability on 'buffer_pool_client_' based on
-  /// the 'debug_action_' set on this node. Returns an error if 'debug_action_param_' is
-  /// invalid.
-  Status EnableDenyReservationDebugAction();
+  BufferPool::ClientHandle* buffer_pool_client() {
+    return reservation_manager_.buffer_pool_client();
+  }
+
+  Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT {
+    return reservation_manager_.ClaimBufferReservation(state);
+  }
+
+  Status ReleaseUnusedReservation() WARN_UNUSED_RESULT {
+    return reservation_manager_.ReleaseUnusedReservation();
+  }
 
   /// Extends blocking queue for row batches. Row batches have a property that
   /// they must be processed in the order they were produced, even in cancellation
@@ -299,9 +291,7 @@ class ExecNode {
 
   /// debug-only: if debug_action_ is not INVALID, node will perform action in
   /// debug_phase_
-  TExecNodePhase::type debug_phase_;
-  TDebugAction::type debug_action_;
-  std::string debug_action_param_;
+  TDebugOptions debug_options_;
 
   int64_t limit_;  // -1: no limit
   int64_t num_rows_returned_;
@@ -328,12 +318,6 @@ class ExecNode {
   /// execution where the memory is not needed.
   boost::scoped_ptr<MemPool> expr_results_pool_;
 
-  /// Buffer pool client for this node. Initialized with the node's minimum reservation
-  /// in ClaimBufferReservation(). After initialization, the client must hold onto at
-  /// least the minimum reservation so that it can be returned to the initial
-  /// reservations pool in Close().
-  BufferPool::ClientHandle buffer_pool_client_;
-
   /// Pointer to the containing SubplanNode or NULL if not inside a subplan.
   /// Set by SubplanNode::Init(). Not owned.
   SubplanNode* containing_subplan_;
@@ -362,7 +346,7 @@ class ExecNode {
       TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT {
     DCHECK_NE(phase, TExecNodePhase::INVALID);
     // Fast path for the common case when an action is not enabled for this phase.
-    if (LIKELY(debug_phase_ != phase)) return Status::OK();
+    if (LIKELY(debug_options_.phase != phase)) return Status::OK();
     return ExecDebugActionImpl(phase, state);
   }
 
@@ -381,6 +365,12 @@ class ExecNode {
   /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not protected
   /// by a lock, it assumes all calls to Close() are made by the same thread.
   bool is_closed_;
+
+  /// Wraps the buffer pool client for this node. Initialized with the node's minimum
+  /// reservation in ClaimBufferReservation(). After initialization, the client must hold
+  /// onto at least the minimum reservation so that it can be returned to the initial
+  /// reservations pool in Close().
+  ReservationManager reservation_manager_;
 };
 
 inline bool ExecNode::EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row) {
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 67f07cd..8307ba6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -527,8 +527,8 @@ Status HdfsScanNodeBase::StartNextScanRange(int64_t* reservation,
     *reservation = IncreaseReservationIncrementally(*reservation, ideal_scan_range_reservation);
     initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation);
     initial_range_actual_reservation_stats_->UpdateCounter(*reservation);
-    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-        &buffer_pool_client_, *scan_range, *reservation));
+    RETURN_IF_ERROR(
+        io_mgr->AllocateBuffersForRange(buffer_pool_client(), *scan_range, *reservation));
   }
   return Status::OK();
 }
@@ -544,7 +544,7 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat
     int64_t target = min(ideal_reservation,
         BitUtil::RoundUpToPowerOf2(curr_reservation + 1, io_mgr->max_buffer_size()));
     DCHECK_LT(curr_reservation, target);
-    bool increased = buffer_pool_client_.IncreaseReservation(target - curr_reservation);
+    bool increased = buffer_pool_client()->IncreaseReservation(target - curr_reservation);
     VLOG_FILE << "Increasing reservation from "
               << PrettyPrinter::PrintBytes(curr_reservation) << " to "
               << PrettyPrinter::PrintBytes(target) << " "
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index b675b20..f3a2253 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -79,7 +79,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       scanner_->Close(row_batch);
       scanner_.reset();
     }
-    int64_t scanner_reservation = buffer_pool_client_.GetReservation();
+    int64_t scanner_reservation = buffer_pool_client()->GetReservation();
     RETURN_IF_ERROR(StartNextScanRange(&scanner_reservation, &scan_range_));
     if (scan_range_ == nullptr) {
       *eos = true;
@@ -90,7 +90,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
-    scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
+    scanner_ctx_.reset(new ScannerContext(runtime_state_, this, buffer_pool_client(),
         scanner_reservation, partition, filter_ctxs(), expr_results_pool()));
     scanner_ctx_->AddStream(scan_range_, scanner_reservation);
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 7b911de..fe2eee7 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -255,10 +255,10 @@ void HdfsScanNode::ReturnReservationFromScannerThread(const unique_lock<mutex>&
     int64_t bytes) {
   DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
   // Release as much memory as possible. Must hold onto the minimum reservation, though.
-  Status status =
-      buffer_pool_client_.DecreaseReservationTo(bytes, resource_profile_.min_reservation);
+  Status status = buffer_pool_client()->DecreaseReservationTo(
+      bytes, resource_profile_.min_reservation);
   DCHECK(status.ok()) << "Not possible, scans don't unpin pages" << status.GetDetail();
-  DCHECK_GE(buffer_pool_client_.GetReservation(), resource_profile_.min_reservation);
+  DCHECK_GE(buffer_pool_client()->GetReservation(), resource_profile_.min_reservation);
 }
 
 void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
@@ -303,7 +303,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       // Cases 5 and 6.
       if (materialized_row_batches_->Size() >= max_materialized_row_batches_) break;
       // The node's min reservation is for the first thread so we don't need to check
-      if (!buffer_pool_client_.IncreaseReservation(scanner_thread_reservation)) {
+      if (!buffer_pool_client()->IncreaseReservation(scanner_thread_reservation)) {
         COUNTER_ADD(scanner_thread_reservations_denied_counter_, 1);
         break;
       }
@@ -466,7 +466,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     return Status::OK();
   }
 
-  ScannerContext context(runtime_state_, this, &buffer_pool_client_,
+  ScannerContext context(runtime_state_, this, buffer_pool_client(),
       *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
   context.AddStream(scan_range, *scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 9fd653c..7e3c4d7 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -58,7 +58,7 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
-      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), &buffer_pool_client_,
+      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
       resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), false));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
@@ -82,7 +82,7 @@ Status PartialSortNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
-  if (!buffer_pool_client_.is_registered()) {
+  if (!buffer_pool_client()->is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
   return Status::OK();
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 3a90f14..b6b1752 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -251,7 +251,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
 
   // Claim reservation after the child has been opened to reduce the peak reservation
   // requirement.
-  if (!buffer_pool_client_.is_registered() && !grouping_exprs_.empty()) {
+  if (!buffer_pool_client()->is_registered() && !grouping_exprs_.empty()) {
     DCHECK_GE(resource_profile_.min_reservation, MinReservation());
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
@@ -270,19 +270,19 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
     if (ht_allocator_ == nullptr) {
       // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call.
       ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
-          &buffer_pool_client_, resource_profile_.spillable_buffer_size));
+          buffer_pool_client(), resource_profile_.spillable_buffer_size));
 
       if (!is_streaming_preagg_ && needs_serialize_) {
         serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_,
-            &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+            buffer_pool_client(), resource_profile_.spillable_buffer_size,
             resource_profile_.max_row_buffer_size));
         RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
         bool got_buffer;
         // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
         // another buffer during spilling.
         RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
-        DCHECK(got_buffer)
-            << "Accounted in min reservation" << buffer_pool_client_.DebugString();
+        DCHECK(got_buffer) << "Accounted in min reservation"
+                           << buffer_pool_client()->DebugString();
         DCHECK(serialize_stream_->has_write_iterator());
       }
     }
@@ -673,21 +673,21 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
     }
   }
 
-  aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
-      &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
-      parent->resource_profile_.spillable_buffer_size,
-      parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
+  aggregated_row_stream.reset(
+      new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+          parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+          parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
   RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
   bool got_buffer;
   RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
   DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
-                     << parent->buffer_pool_client_.DebugString() << "\n"
+                     << parent->buffer_pool_client()->DebugString() << "\n"
                      << parent->DebugString(2);
   if (!parent->is_streaming_preagg_) {
-    unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
-        parent->child(0)->row_desc(), &parent->buffer_pool_client_,
-        parent->resource_profile_.spillable_buffer_size,
-        parent->resource_profile_.max_row_buffer_size));
+    unaggregated_row_stream.reset(
+        new BufferedTupleStream(parent->state_, parent->child(0)->row_desc(),
+            parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+            parent->resource_profile_.max_row_buffer_size));
     // This stream is only used to spill, no need to ever have this pinned.
     RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
     // Save memory by waiting until we spill to allocate the write buffer for the
@@ -755,10 +755,10 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     // when we need to spill again. We need to have this available before we need
     // to spill to make sure it is available. This should be acquirable since we just
     // freed at least one buffer from this partition's (old) aggregated_row_stream.
-    parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
-        &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
-        parent->resource_profile_.spillable_buffer_size,
-        parent->resource_profile_.max_row_buffer_size));
+    parent->serialize_stream_.reset(
+        new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
+            parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
+            parent->resource_profile_.max_row_buffer_size));
     status = parent->serialize_stream_->Init(parent->id(), false);
     if (status.ok()) {
       bool got_buffer;
@@ -805,8 +805,8 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
     aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
     bool got_buffer;
     RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    DCHECK(got_buffer)
-        << "Accounted in min reservation" << parent->buffer_pool_client_.DebugString();
+    DCHECK(got_buffer) << "Accounted in min reservation"
+                       << parent->buffer_pool_client()->DebugString();
   }
 
   COUNTER_ADD(parent->num_spilled_partitions_, 1);
@@ -1135,7 +1135,7 @@ Status PartitionedAggregationNode::NextPartition() {
     // All partitions are in memory. Release reservation that was used for previous
     // partitions that is no longer needed. If we have spilled partitions, we want to
     // hold onto all reservation in case it is needed to process the spilled partitions.
-    DCHECK(!buffer_pool_client_.has_unpinned_pages());
+    DCHECK(!buffer_pool_client()->has_unpinned_pages());
     Status status = ReleaseUnusedReservation();
     DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are "
                         << "no unpinned pages. " << status.GetDetail();
@@ -1155,7 +1155,8 @@ Status PartitionedAggregationNode::NextPartition() {
     // No aggregated partitions in memory - we should not be using any reservation aside
     // from 'serialize_stream_'.
     DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
-        buffer_pool_client_.GetUsedReservation()) << buffer_pool_client_.DebugString();
+        buffer_pool_client()->GetUsedReservation())
+        << buffer_pool_client()->DebugString();
 
     // Try to fit a single spilled partition in memory. We can often do this because
     // we only need to fit 1/PARTITION_FANOUT of the data in memory.
@@ -1211,7 +1212,8 @@ Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_parti
     // Spilled the partition - we should not be using any reservation except from
     // 'serialize_stream_'.
     DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0,
-        buffer_pool_client_.GetUsedReservation()) << buffer_pool_client_.DebugString();
+        buffer_pool_client()->GetUsedReservation())
+        << buffer_pool_client()->DebugString();
   } else {
     *built_partition = dst_partition;
   }
@@ -1244,8 +1246,8 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
     bool got_buffer;
     RETURN_IF_ERROR(
         hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    DCHECK(got_buffer)
-        << "Accounted in min reservation" << buffer_pool_client_.DebugString();
+    DCHECK(got_buffer) << "Accounted in min reservation"
+                       << buffer_pool_client()->DebugString();
   }
   RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
 
@@ -1312,7 +1314,8 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
     }
   }
   DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
-                               << "reclaim memory: " << buffer_pool_client_.DebugString();
+                               << "reclaim memory: "
+                               << buffer_pool_client()->DebugString();
   // Remove references to the destroyed hash table from 'hash_tbls_'.
   // Additionally, we might be dealing with a rebuilt spilled partition, where all
   // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 909a427..8281196 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -77,8 +77,8 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
   // being separated out further.
   builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(),
-        child(1)->row_desc(), state, &buffer_pool_client_,
-        resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
+      child(1)->row_desc(), state, buffer_pool_client(),
+      resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
   RETURN_IF_ERROR(
       builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
 
@@ -187,7 +187,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
 
 Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
   DCHECK_GE(resource_profile_.min_reservation, builder_->MinReservation());
-  if (!buffer_pool_client_.is_registered()) {
+  if (!buffer_pool_client()->is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
@@ -429,7 +429,7 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     if (UNLIKELY(num_input_rows == largest_partition_rows)) {
       return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
           next_partition_level, num_input_rows, NodeDebugString(),
-          buffer_pool_client_.DebugString());
+          buffer_pool_client()->DebugString());
     }
 
     RETURN_IF_ERROR(PrepareForProbe());
@@ -821,23 +821,22 @@ Status PartitionedHashJoinNode::NullAwareAntiJoinError(BufferedTupleStream* rows
       "was $1. $2/$3 of the join's reservation was available for the rows.",
       rows->num_rows(), PrettyPrinter::PrintBytes(rows->byte_size()),
       PrettyPrinter::PrintBytes(
-          buffer_pool_client_.GetUnusedReservation() + rows->BytesPinned(false)),
-      PrettyPrinter::PrintBytes(buffer_pool_client_.GetReservation())));
+          buffer_pool_client()->GetUnusedReservation() + rows->BytesPinned(false)),
+      PrettyPrinter::PrintBytes(buffer_pool_client()->GetReservation())));
 }
 
 Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   RuntimeState* state = runtime_state_;
-  unique_ptr<BufferedTupleStream> probe_rows = make_unique<BufferedTupleStream>(
-      state, child(0)->row_desc(), &buffer_pool_client_,
-      resource_profile_.spillable_buffer_size,
-      resource_profile_.max_row_buffer_size);
+  unique_ptr<BufferedTupleStream> probe_rows =
+      make_unique<BufferedTupleStream>(state, child(0)->row_desc(), buffer_pool_client(),
+          resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size);
   Status status = probe_rows->Init(id(), true);
   if (!status.ok()) goto error;
   bool got_buffer;
   status = probe_rows->PrepareForWrite(&got_buffer);
   if (!status.ok()) goto error;
-  DCHECK(got_buffer)
-      << "Accounted in min reservation" << buffer_pool_client_.DebugString();
+  DCHECK(got_buffer) << "Accounted in min reservation"
+                     << buffer_pool_client()->DebugString();
   null_aware_probe_partition_.reset(new ProbePartition(
       state, this, builder_->null_aware_partition(), std::move(probe_rows)));
   return Status::OK();
@@ -851,15 +850,15 @@ error:
 
 Status PartitionedHashJoinNode::InitNullProbeRows() {
   RuntimeState* state = runtime_state_;
-  null_probe_rows_ = make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
-      &buffer_pool_client_, resource_profile_.spillable_buffer_size,
-      resource_profile_.max_row_buffer_size);
+  null_probe_rows_ =
+      make_unique<BufferedTupleStream>(state, child(0)->row_desc(), buffer_pool_client(),
+          resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size);
   // Start with stream pinned, unpin later if needed.
   RETURN_IF_ERROR(null_probe_rows_->Init(id(), true));
   bool got_buffer;
   RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
-  DCHECK(got_buffer)
-      << "Accounted in min reservation" << buffer_pool_client_.DebugString();
+  DCHECK(got_buffer) << "Accounted in min reservation"
+                     << buffer_pool_client()->DebugString();
   return Status::OK();
 }
 
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 546f3ba..f3e3ffc 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -52,10 +52,9 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status SortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  sorter_.reset(
-      new Sorter(ordering_exprs_, is_asc_order_, nulls_first_, sort_tuple_exprs_,
-          &row_descriptor_, mem_tracker(), &buffer_pool_client_,
-          resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), true));
+  sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
+      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), true));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
@@ -76,7 +75,7 @@ Status SortNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(child(0)->Open(state));
   // Claim reservation after the child has been opened to reduce the peak reservation
   // requirement.
-  if (!buffer_pool_client_.is_registered()) {
+  if (!buffer_pool_client()->is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
   RETURN_IF_ERROR(sorter_->Open());
@@ -108,7 +107,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     // for the next subplan iteration or merging spilled runs.
     returned_buffer_ = false;
     if (!IsInSubplan() && !sorter_->HasSpilledRuns()) {
-      DCHECK(!buffer_pool_client_.has_unpinned_pages());
+      DCHECK(!buffer_pool_client()->has_unpinned_pages());
       Status status = ReleaseUnusedReservation();
       DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are "
                           << "unpinned. " << status.GetDetail();
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index a55a394..2fea5bd 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -58,6 +58,7 @@ add_library(Runtime
   types.cc
   raw-value.cc
   raw-value-ir.cc
+  reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
   runtime-filter.cc
diff --git a/be/src/runtime/reservation-manager.cc b/be/src/runtime/reservation-manager.cc
new file mode 100644
index 0000000..f16b48f
--- /dev/null
+++ b/be/src/runtime/reservation-manager.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/reservation-manager.h"
+
+#include "gutil/strings/substitute.h"
+#include "runtime/exec-env.h"
+#include "runtime/initial-reservations.h"
+#include "runtime/query-state.h"
+#include "runtime/runtime-state.h"
+#include "util/string-parser.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+void ReservationManager::Init(string name, RuntimeProfile* runtime_profile,
+    MemTracker* mem_tracker, const TBackendResourceProfile& resource_profile,
+    const TDebugOptions& debug_options) {
+  name_ = name;
+  runtime_profile_ = runtime_profile;
+  mem_tracker_ = mem_tracker;
+  resource_profile_ = resource_profile;
+  debug_options_ = debug_options;
+}
+
+void ReservationManager::Close(RuntimeState* state) {
+  if (buffer_pool_client_.is_registered()) {
+    VLOG_FILE << name_ << " returning reservation " << resource_profile_.min_reservation;
+    state->query_state()->initial_reservations()->Return(
+        &buffer_pool_client_, resource_profile_.min_reservation);
+    state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+  }
+}
+
+Status ReservationManager::ClaimBufferReservation(RuntimeState* state) {
+  DCHECK(!buffer_pool_client_.is_registered());
+  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+  // Check the minimum buffer size in case the minimum buffer size used by the planner
+  // doesn't match this backend's.
+  if (resource_profile_.__isset.spillable_buffer_size
+      && resource_profile_.spillable_buffer_size < buffer_pool->min_buffer_len()) {
+    return Status(Substitute("Spillable buffer size for $0 of $1 bytes is less "
+                             "than the minimum buffer pool buffer size of $2 bytes",
+        name_, resource_profile_.spillable_buffer_size, buffer_pool->min_buffer_len()));
+  }
+
+  RETURN_IF_ERROR(buffer_pool->RegisterClient(name_, state->query_state()->file_group(),
+      state->instance_buffer_reservation(), mem_tracker_,
+      resource_profile_.max_reservation, runtime_profile_, &buffer_pool_client_));
+  VLOG_FILE << name_ << " claiming reservation " << resource_profile_.min_reservation;
+  state->query_state()->initial_reservations()->Claim(
+      &buffer_pool_client_, resource_profile_.min_reservation);
+  if (debug_options_.action == TDebugAction::SET_DENY_RESERVATION_PROBABILITY
+      && (debug_options_.phase == TExecNodePhase::PREPARE
+             || debug_options_.phase == TExecNodePhase::OPEN)) {
+    // We may not have been able to enable the debug action at the start of Prepare() or
+    // Open() because the client is not registered then. Do it now to be sure that it is
+    // effective.
+    RETURN_IF_ERROR(EnableDenyReservationDebugAction());
+  }
+  return Status::OK();
+}
+
+Status ReservationManager::ReleaseUnusedReservation() {
+  return buffer_pool_client_.DecreaseReservationTo(
+      buffer_pool_client_.GetUnusedReservation(), resource_profile_.min_reservation);
+}
+
+Status ReservationManager::EnableDenyReservationDebugAction() {
+  DCHECK_EQ(debug_options_.action, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+  // Parse [0.0, 1.0] probability.
+  StringParser::ParseResult parse_result;
+  double probability =
+      StringParser::StringToFloat<double>(debug_options_.action_param.c_str(),
+          debug_options_.action_param.size(), &parse_result);
+  if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0
+      || probability > 1.0) {
+    return Status(Substitute("Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'",
+        debug_options_.action_param));
+  }
+  buffer_pool_client_.SetDebugDenyIncreaseReservation(probability);
+  return Status::OK();
+}
+
+} // namespace impala
diff --git a/be/src/runtime/reservation-manager.h b/be/src/runtime/reservation-manager.h
new file mode 100644
index 0000000..0545548
--- /dev/null
+++ b/be/src/runtime/reservation-manager.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_RESERVATION_MANAGER_H
+#define IMPALA_RUNTIME_RESERVATION_MANAGER_H
+
+#include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
+
+namespace impala {
+
+class MemTracker;
+class RuntimeProfile;
+class RuntimeState;
+class TBackendResourceProfile;
+class TDebugOptions;
+
+/// A ReservationManager provides a wrapper around a TBackendResourceProfile and
+/// a BufferPool::ClientHandle, with functionality for claiming and releasing the
+/// memory from the resource profile.
+class ReservationManager {
+ public:
+  ReservationManager() {}
+
+  /// Initialize this ReservationManager with the given values. 'name' is used in logging
+  /// and error messages, 'debug_options' is used for the SET_DENY_RESERVATION_PROBABILITY
+  /// action. Does not take ownership of 'runtime_profile' and 'mem_tracker'. Must be
+  /// called before ClaimBufferReservation().
+  void Init(std::string name, RuntimeProfile* runtime_profile, MemTracker* mem_tracker,
+      const TBackendResourceProfile& resource_profile,
+      const TDebugOptions& debug_options);
+  void Close(RuntimeState* state);
+
+  BufferPool::ClientHandle* buffer_pool_client() { return &buffer_pool_client_; }
+
+  /// Initialize 'buffer_pool_client_' and claim the initial reservation. The client is
+  /// cleaned up in Close(). Should not be called if the client is already open.
+  ///
+  /// The initial reservation must be returned to
+  /// QueryState::initial_reservations(), which is done automatically in Close() as long
+  /// as the initial reservation is not released before Close().
+  Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Release any unused reservation in excess of the initial reservation. Returns an
+  /// error if releasing the reservation requires flushing pages to disk, and that fails.
+  /// Not thread-safe if other threads are accessing 'buffer_pool_client_'.
+  Status ReleaseUnusedReservation() WARN_UNUSED_RESULT;
+
+  /// Enable the increase reservation denial probability on 'buffer_pool_client_' based
+  /// on 'debug_options_'. Returns an error if 'debug_options_.debug_action_param_'
+  /// is invalid.
+  Status EnableDenyReservationDebugAction();
+
+ private:
+  std::string name_;
+
+  RuntimeProfile* runtime_profile_;
+
+  MemTracker* mem_tracker_;
+
+  /// Buffer pool client for this node. Initialized with the node's minimum reservation
+  /// in ClaimBufferReservation(). After initialization, the client must hold onto at
+  /// least the minimum reservation so that it can be returned to the initial
+  /// reservations pool in Close().
+  BufferPool::ClientHandle buffer_pool_client_;
+
+  /// Resource information sent from the frontend.
+  TBackendResourceProfile resource_profile_;
+
+  TDebugOptions debug_options_;
+};
+
+} // namespace impala
+
+#endif // IMPALA_RUNTIME_RESERVATION_MANAGER_H