You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/13 00:21:46 UTC

[3/3] incubator-impala git commit: IMPALA-4833: Compute precise per-host reservation size

IMPALA-4833: Compute precise per-host reservation size

Before this change, the per-host reservation size was computed
by the Planner. However, scheduling happens after planning,
so the Planner must assume that all fragments run on all
hosts, and the reservation size is likely much larger than
it needs to be.

This moves the computation of the per-host reservation size
to the BE where it can be computed more precisely. This also
includes a number of plan/profile changes.

Change-Id: Idbcd1e9b1be14edc4017b4907e83f9d56059fbac
Reviewed-on: http://gerrit.cloudera.org:8080/7630
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6c125465
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6c125465
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6c125465

Branch: refs/heads/master
Commit: 6c1254656186b62e90674b4fe093a6864ccbbde5
Parents: f2f52a8
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Aug 9 13:42:24 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Aug 12 08:10:07 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc     |  22 +--
 be/src/runtime/coordinator-backend-state.h      |  16 ++-
 be/src/runtime/coordinator.cc                   |  14 +-
 be/src/runtime/query-state.cc                   |   4 +-
 be/src/scheduling/admission-controller.cc       |   7 +-
 be/src/scheduling/query-schedule.cc             |  10 +-
 be/src/scheduling/query-schedule.h              |  37 ++++-
 be/src/scheduling/scheduler-test-util.cc        |   4 +-
 be/src/scheduling/scheduler.cc                  |  45 ++++--
 be/src/scheduling/scheduler.h                   |   4 +
 be/src/service/client-request-state.cc          |   6 -
 be/src/service/impala-server.cc                 |  18 +--
 common/thrift/Frontend.thrift                   |  11 +-
 common/thrift/ImpalaInternalService.thrift      |  25 ++--
 common/thrift/Planner.thrift                    |   8 ++
 .../org/apache/impala/planner/PlanFragment.java |  39 ++++--
 .../java/org/apache/impala/planner/Planner.java |  59 ++++----
 .../queries/PlannerTest/disable-codegen.test    |  20 +--
 .../PlannerTest/resource-requirements.test      | 140 +++++++++----------
 .../PlannerTest/spillable-buffer-sizing.test    |  36 ++---
 .../queries/QueryTest/explain-level0.test       |   2 +-
 .../queries/QueryTest/explain-level1.test       |   2 +-
 .../queries/QueryTest/explain-level2.test       |   2 +-
 .../queries/QueryTest/explain-level3.test       |   2 +-
 .../queries/QueryTest/stats-extrapolation.test  |  10 +-
 tests/custom_cluster/test_coordinators.py       |   2 +-
 tests/custom_cluster/test_mem_reservations.py   |  86 ++++++++++++
 27 files changed, 390 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 93a3af2..195880f 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -60,16 +60,17 @@ Coordinator::BackendState::BackendState(
 }
 
 void Coordinator::BackendState::Init(
-    const vector<const FInstanceExecParams*>& instance_params_list,
+    const BackendExecParams& exec_params,
     const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
-  instance_params_list_ = instance_params_list;
-  host_ = instance_params_list_[0]->host;
-  num_remaining_instances_ = instance_params_list.size();
+  backend_exec_params_ = &exec_params;
+  host_ = backend_exec_params_->instance_params[0]->host;
+  num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   // populate instance_stats_map_ and install instance
   // profiles as child profiles in fragment_stats' profile
   int prev_fragment_idx = -1;
-  for (const FInstanceExecParams* instance_params: instance_params_list) {
+  for (const FInstanceExecParams* instance_params:
+       backend_exec_params_->instance_params) {
     DCHECK_EQ(host_, instance_params->host);  // all hosts must be the same
     int fragment_idx = instance_params->fragment().idx;
     DCHECK_LT(fragment_idx, fragment_stats.size());
@@ -92,12 +93,17 @@ void Coordinator::BackendState::SetRpcParams(
     TExecQueryFInstancesParams* rpc_params) {
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
   rpc_params->__set_coord_state_idx(state_idx_);
+  rpc_params->__set_min_reservation_bytes(backend_exec_params_->min_reservation_bytes);
+  rpc_params->__set_initial_reservation_total_bytes(
+      backend_exec_params_->initial_reservation_total_bytes);
 
   // set fragment_ctxs and fragment_instance_ctxs
-  rpc_params->fragment_instance_ctxs.resize(instance_params_list_.size());
-  for (int i = 0; i < instance_params_list_.size(); ++i) {
+  rpc_params->__isset.fragment_ctxs = true;
+  rpc_params->__isset.fragment_instance_ctxs = true;
+  rpc_params->fragment_instance_ctxs.resize(backend_exec_params_->instance_params.size());
+  for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
     TPlanFragmentInstanceCtx& instance_ctx = rpc_params->fragment_instance_ctxs[i];
-    const FInstanceExecParams& params = *instance_params_list_[i];
+    const FInstanceExecParams& params = *backend_exec_params_->instance_params[i];
     int fragment_idx = params.fragment_exec_params.fragment.idx;
 
     // add a TPlanFragmentCtx, if we don't already have it

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 022df47..d4762af 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -32,7 +32,6 @@
 namespace impala {
 
 class ProgressUpdater;
-class FInstanceExecParams;
 class ObjectPool;
 class DebugOptions;
 class CountingBarrier;
@@ -40,6 +39,7 @@ class TUniqueId;
 class TQueryCtx;
 class TReportExecStatusParams;
 class ExecSummary;
+struct FInstanceExecParams;
 
 /// This class manages all aspects of the execution of all fragment instances of a
 /// single query on a particular backend.
@@ -49,11 +49,11 @@ class Coordinator::BackendState {
   BackendState(const TUniqueId& query_id, int state_idx,
       TRuntimeFilterMode::type filter_mode);
 
-  /// Creates InstanceStats for all entries in instance_params_list in obj_pool
+  /// Creates InstanceStats for all instance in backend_exec_params in obj_pool
   /// and installs the instance profiles as children of the corresponding FragmentStats'
   /// root profile.
   /// Separated from c'tor to simplify future handling of out-of-mem errors.
-  void Init(const vector<const FInstanceExecParams*>& instance_params_list,
+  void Init(const BackendExecParams& backend_exec_params,
       const std::vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool);
 
   /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and
@@ -102,7 +102,10 @@ class Coordinator::BackendState {
   const TNetworkAddress& impalad_address() const { return host_; }
   int state_idx() const { return state_idx_; }
 
-  /// only valid after Exec()
+  /// Valid after Init().
+  const BackendExecParams* exec_params() const { return backend_exec_params_; }
+
+  /// Only valid after Exec().
   int64_t rpc_latency() const { return rpc_latency_; }
 
   /// Return true if execution at this backend is done.
@@ -166,9 +169,8 @@ class Coordinator::BackendState {
   const int state_idx_;  /// index of 'this' in Coordinator::backend_states_
   const TRuntimeFilterMode::type filter_mode_;
 
-  /// all instances of a particular fragment are contiguous in this vector;
-  /// query lifetime
-  std::vector<const FInstanceExecParams*> instance_params_list_;
+  /// Backend exec params, owned by the QuerySchedule and has query lifetime.
+  const BackendExecParams* backend_exec_params_;
 
   /// map from instance idx to InstanceStats, the latter live in the obj_pool parameter
   /// of Init()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2ebbfdc..9687dcc 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -225,25 +225,15 @@ void Coordinator::InitFragmentStats() {
 }
 
 void Coordinator::InitBackendStates() {
-  int num_backends = schedule_.unique_hosts().size();
+  int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
   backend_states_.resize(num_backends);
 
-  // collect the FInstanceExecParams for each host
-  typedef map<TNetworkAddress, vector<const FInstanceExecParams*>> BackendParamsMap;
-  BackendParamsMap backend_params_map;
-  for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
-    for (const FInstanceExecParams& instance_params:
-        fragment_params.instance_exec_params) {
-      backend_params_map[instance_params.host].push_back(&instance_params);
-    }
-  }
-
   // create BackendStates
   bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
   const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
-  for (const auto& entry: backend_params_map) {
+  for (const auto& entry: schedule_.per_backend_exec_params()) {
     if (has_coord_fragment && coord_address == entry.first) {
       coord_backend_idx_ = backend_idx;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 92a6b7b..c279fa0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -139,9 +139,9 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
       buffer_reservation_, query_mem_tracker_,
-      query_ctx_.per_host_initial_reservation_total_claims));
+      rpc_params.initial_reservation_total_bytes));
   RETURN_IF_ERROR(
-      initial_reservations_->Init(query_id(), query_ctx_.per_host_min_reservation));
+      initial_reservations_->Init(query_id(), rpc_params.min_reservation_bytes));
   DCHECK_EQ(0, initial_reservation_refcnt_.Load());
   initial_reservation_refcnt_.Add(1); // Decremented in QueryExecMgr::StartQueryHelper().
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 543fdf4..07f473f 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -296,8 +296,8 @@ void AdmissionController::PoolStats::Dequeue(const QuerySchedule& schedule,
 
 void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
     int64_t per_node_mem) {
-  const unordered_set<TNetworkAddress>& hosts = schedule.unique_hosts();
-  for (const TNetworkAddress& host_addr: hosts) {
+  for (const auto& entry : schedule.per_backend_exec_params()) {
+    const TNetworkAddress& host_addr = entry.first;
     const string host = TNetworkAddressToString(host_addr);
     VLOG_ROW << "Update admitted mem reserved for host=" << host
              << " prev=" << PrintBytes(host_mem_admitted_[host])
@@ -337,7 +337,8 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
 
   // Case 2:
   int64_t proc_mem_limit = GetProcMemLimit();
-  for (const TNetworkAddress& host: schedule.unique_hosts()) {
+  for (const auto& entry : schedule.per_backend_exec_params()) {
+    const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
     int64_t mem_reserved = host_mem_reserved_[host_id];
     int64_t mem_admitted = host_mem_admitted_[host_id];

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index a035c7d..244bef3 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -168,11 +168,13 @@ void QuerySchedule::Validate() const {
       }
     }
   }
+  // TODO: add validation for BackendExecParams
 }
 
 int64_t QuerySchedule::GetClusterMemoryEstimate() const {
-  DCHECK_GT(unique_hosts_.size(), 0);
-  const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * unique_hosts_.size();
+  DCHECK_GT(per_backend_exec_params_.size(), 0);
+  const int64_t total_cluster_mem =
+      GetPerHostMemoryEstimate() * per_backend_exec_params_.size();
   DCHECK_GE(total_cluster_mem, 0); // Assume total cluster memory fits in an int64_t.
   return total_cluster_mem;
 }
@@ -206,10 +208,6 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
   return min(per_host_mem, MemInfo::physical_mem());
 }
 
-void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_hosts) {
-  unique_hosts_ = unique_hosts;
-}
-
 TUniqueId QuerySchedule::GetNextInstanceId() {
   TUniqueId result = next_instance_id_;
   ++next_instance_id_.lo;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 8a985c7..adb39ef 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -21,7 +21,6 @@
 #include <vector>
 #include <string>
 #include <unordered_map>
-#include <boost/unordered_set.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include "common/global-types.h"
@@ -36,6 +35,7 @@ namespace impala {
 
 class Coordinator;
 struct FragmentExecParams;
+struct FInstanceExecParams;
 
 /// map from scan node id to a list of scan ranges
 typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
@@ -45,6 +45,26 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
+/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
+/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to a BackendState.
+struct BackendExecParams {
+  /// The fragment instance params assigned to this backend. All instances of a
+  /// particular fragment are contiguous in this vector. Query lifetime;
+  /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
+  std::vector<const FInstanceExecParams*> instance_params;
+
+  // The minimum reservation size (in bytes) required for all fragments in
+  // instance_params.
+  int64_t min_reservation_bytes;
+
+  // The total of initial reservations (in bytes) that will be claimed over the lifetime
+  // of this query for the fragments in instance_params.
+  int64_t initial_reservation_total_bytes;
+};
+
+/// map from an impalad host address to the list of assigned fragment instance params.
+typedef std::map<TNetworkAddress, BackendExecParams> PerBackendExecParams;
+
 /// execution parameters for a single fragment instance; used to assemble the
 /// TPlanFragmentInstanceCtx
 struct FInstanceExecParams {
@@ -167,6 +187,9 @@ class QuerySchedule {
     return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
   }
 
+  const PerBackendExecParams& per_backend_exec_params() const {
+    return per_backend_exec_params_;
+  }
   const std::vector<FragmentExecParams>& fragment_exec_params() const {
     return fragment_exec_params_;
   }
@@ -179,15 +202,14 @@ class QuerySchedule {
 
   const FInstanceExecParams& GetCoordInstanceExecParams() const;
 
-  const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
-    return unique_hosts_;
-  }
   bool is_admitted() const { return is_admitted_; }
   void set_is_admitted(bool is_admitted) { is_admitted_ = is_admitted; }
   RuntimeProfile* summary_profile() { return summary_profile_; }
   RuntimeProfile::EventSequence* query_events() { return query_events_; }
 
-  void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
+  void set_per_backend_exec_params(const PerBackendExecParams& params) {
+    per_backend_exec_params_ = params;
+  }
 
  private:
   /// These references are valid for the lifetime of this query schedule because they
@@ -213,8 +235,9 @@ class QuerySchedule {
   // (TPlanFragment.idx)
   std::vector<FragmentExecParams> fragment_exec_params_;
 
-  /// The set of hosts that the query will run on including the coordinator.
-  boost::unordered_set<TNetworkAddress> unique_hosts_;
+  // Map of host address to list of assigned FInstanceExecParams*, which
+  // reference fragment_exec_params_. Computed in Scheduler::Schedule().
+  PerBackendExecParams per_backend_exec_params_;
 
   /// Total number of scan ranges of this query.
   int64_t num_scan_ranges_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 7547782..1f233d2 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -16,9 +16,11 @@
 // under the License.
 
 #include "scheduling/scheduler-test-util.h"
-#include "scheduling/scheduler.h"
+
+#include <boost/unordered_set.hpp>
 
 #include "common/names.h"
+#include "scheduling/scheduler.h"
 
 using namespace impala;
 using namespace impala::test;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index d746785..619c89c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -24,6 +24,7 @@
 #include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
 #include <boost/mem_fn.hpp>
+#include <boost/unordered_set.hpp>
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
@@ -678,27 +679,49 @@ void Scheduler::GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& para
 }
 
 Status Scheduler::Schedule(QuerySchedule* schedule) {
-  string resolved_pool;
-  RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
-      schedule->request().query_ctx, &resolved_pool));
-  schedule->set_request_pool(resolved_pool);
-  schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
-
   RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule));
   ComputeFragmentExecParams(schedule);
+  ComputeBackendExecParams(schedule);
 #ifndef NDEBUG
   schedule->Validate();
 #endif
 
-  // compute unique hosts
-  unordered_set<TNetworkAddress> unique_hosts;
+  // TODO: Move to admission control, it doesn't need to be in the Scheduler.
+  string resolved_pool;
+  RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
+      schedule->request().query_ctx, &resolved_pool));
+  schedule->set_request_pool(resolved_pool);
+  schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
+  return Status::OK();
+}
+
+void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) {
+  PerBackendExecParams per_backend_params;
   for (const FragmentExecParams& f : schedule->fragment_exec_params()) {
     for (const FInstanceExecParams& i : f.instance_exec_params) {
-      unique_hosts.insert(i.host);
+      BackendExecParams& be_params = per_backend_params[i.host];
+      be_params.instance_params.push_back(&i);
+      // Different fragments do not synchronize their Open() and Close(), so the backend
+      // does not provide strong guarantees about whether one fragment instance releases
+      // resources before another acquires them. Conservatively assume that all fragment
+      // instances on this backend can consume their peak resources at the same time,
+      // i.e. that this backend's peak resources is the sum of the per-fragment-instance
+      // peak resources for the instances executing on this backend.
+      be_params.min_reservation_bytes += f.fragment.min_reservation_bytes;
+      be_params.initial_reservation_total_bytes +=
+          f.fragment.initial_reservation_total_bytes;
     }
   }
-  schedule->SetUniqueHosts(unique_hosts);
-  return Status::OK();
+  schedule->set_per_backend_exec_params(per_backend_params);
+
+  stringstream min_reservation_ss;
+  for (const auto& e: per_backend_params) {
+    min_reservation_ss << e.first << "("
+         << PrettyPrinter::Print(e.second.min_reservation_bytes, TUnit::BYTES)
+         << ") ";
+  }
+  schedule->summary_profile()->AddInfoString("Per Host Min Reservation",
+      min_reservation_ss.str());
 }
 
 Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 043deac..3126436 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -400,6 +400,10 @@ class Scheduler {
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
+  /// Computes BackendExecParams for all backends assigned in the query. Must be called
+  /// after ComputeFragmentExecParams().
+  void ComputeBackendExecParams(QuerySchedule* schedule);
+
   /// Compute the FragmentExecParams for all plans in the schedule's
   /// TQueryExecRequest.plan_exec_info.
   /// This includes the routing information (destinations, per_exch_num_senders,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index bf0f9b4..d75e639 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -57,7 +57,6 @@ namespace impala {
 // Keys into the info string map of the runtime profile referring to specific
 // items used by CM for monitoring purposes.
 static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
-static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory Reservation";
 static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
 static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
 static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
@@ -399,11 +398,6 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     ss << query_exec_request.per_host_mem_estimate;
     summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
   }
-  if (query_exec_request.query_ctx.__isset.per_host_min_reservation) {
-    stringstream ss;
-    ss << query_exec_request.query_ctx.per_host_min_reservation;
-    summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
-  }
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&
       query_exec_request.query_ctx.__isset.tables_missing_stats &&
       !query_exec_request.query_ctx.tables_missing_stats.empty()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index b476a06..5778041 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -878,11 +878,12 @@ Status ImpalaServer::ExecuteInternal(
   }
 
   if ((*request_state)->coord() != nullptr) {
-    const unordered_set<TNetworkAddress>& unique_hosts =
-        (*request_state)->schedule()->unique_hosts();
-    if (!unique_hosts.empty()) {
+    const PerBackendExecParams& per_backend_params =
+        (*request_state)->schedule()->per_backend_exec_params();
+    if (!per_backend_params.empty()) {
       lock_guard<mutex> l(query_locations_lock_);
-      for (const TNetworkAddress& host: unique_hosts) {
+      for (const auto& entry : per_backend_params) {
+        const TNetworkAddress& host = entry.first;
         query_locations_[host].insert((*request_state)->query_id());
       }
     }
@@ -1013,11 +1014,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
     request_state->summary_profile()->AddInfoString("Errors",
         request_state->coord()->GetErrorLog());
 
-    const unordered_set<TNetworkAddress>& unique_hosts =
-        request_state->schedule()->unique_hosts();
-    if (!unique_hosts.empty()) {
+    const PerBackendExecParams& per_backend_params =
+        request_state->schedule()->per_backend_exec_params();
+    if (!per_backend_params.empty()) {
       lock_guard<mutex> l(query_locations_lock_);
-      for (const TNetworkAddress& hostport: unique_hosts) {
+      for (const auto& entry : per_backend_params) {
+        const TNetworkAddress& hostport = entry.first;
         // Query may have been removed already by cancellation path. In particular, if
         // node to fail was last sender to an exchange, the coordinator will realise and
         // fail the query at the same time the failure detection path does the same

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 3a88915..c874f6f 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -386,14 +386,19 @@ struct TQueryExecRequest {
   // AS SELECT), these may differ.
   7: required Types.TStmtType stmt_type
 
-  // Estimated per-host peak memory consumption in bytes. Used for resource management.
-  8: optional i64 per_host_mem_estimate
-
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
   9: required list<Types.TNetworkAddress> host_list
 
   // Column lineage graph
   10: optional LineageGraph.TLineageGraph lineage_graph
+
+  // Estimated per-host peak memory consumption in bytes. Used by admission control.
+  // TODO: Remove when AC doesn't rely on this any more.
+  8: optional i64 per_host_mem_estimate
+
+  // Maximum possible (in the case all fragments are scheduled on all hosts with
+  // max DOP) minimum reservation required per host, in bytes.
+  11: optional i64 max_per_host_min_reservation;
 }
 
 enum TCatalogOpType {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index b1084ed..7328cc2 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -387,18 +387,6 @@ struct TQueryCtx {
   // String containing a timestamp (in UTC) set as the query submission time. It
   // represents the same point in time as now_string
   17: required string utc_timestamp_string
-
-  // Minimum query-wide buffer reservation required per host in bytes. This is the peak
-  // minimum reservation that may be required by the concurrently-executing operators at
-  // any point in query execution. It may be less than the initial reservation total
-  // claims (below) if execution of some operators never overlaps, which allows reuse of
-  // reservations.
-  18: optional i64 per_host_min_reservation;
-
-  // Total of the initial buffer reservations that we expect to be claimed per host.
-  // I.e. the sum over all operators in all fragment instances that execute on that host.
-  // Measured in bytes.
-  19: optional i64 per_host_initial_reservation_total_claims;
 }
 
 // Specification of one output destination of a plan fragment
@@ -486,11 +474,20 @@ struct TExecQueryFInstancesParams {
   3: optional TQueryCtx query_ctx
 
   // required in V1
-  4: list<TPlanFragmentCtx> fragment_ctxs
+  4: optional list<TPlanFragmentCtx> fragment_ctxs
 
   // the order corresponds to the order of fragments in fragment_ctxs
   // required in V1
-  5: list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
+  5: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
+
+  // The minimum reservation size (in bytes) required for all fragments in fragment_ctxs.
+  // required in V1
+  6: optional i64 min_reservation_bytes
+
+  // The total of initial reservations (in bytes) that will be claimed over the lifetime
+  // of this query for the fragments in fragment_ctxs.
+  // required in V1
+  7: optional i64 initial_reservation_total_bytes
 }
 
 struct TExecQueryFInstancesResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 766551e..1634dd4 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -60,6 +60,14 @@ struct TPlanFragment {
   // This is distinct from the partitioning of each plan fragment's
   // output, which is specified by output_sink.output_partitioning.
   6: required Partitions.TDataPartition partition
+
+  // The minimum reservation size (in bytes) required for this plan fragment to execute
+  // on a single host.
+  7: optional i64 min_reservation_bytes
+
+  // The total of initial reservations (in bytes) that will be claimed over the lifetime
+  // of this fragment.
+  8: optional i64 initial_reservation_total_bytes
 }
 
 // location information for a single scan range

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 05bcf25..2f6f7de 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -105,10 +105,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // if the output is UNPARTITIONED, it is being broadcast
   private DataPartition outputPartition_;
 
-  // Resource requirements and estimates for all instances of this plan fragment running
-  // on a host. Initialized with a dummy value. Gets set correctly in
+  // Resource requirements and estimates for an instance of this plan fragment.
+  // Initialized with a dummy value. Gets set correctly in
   // computeResourceProfile().
-  private ResourceProfile perHostResourceProfile_ = ResourceProfile.invalid();
+  private ResourceProfile resourceProfile_ = ResourceProfile.invalid();
+
+  // The total of initial reservations (in bytes) that will be claimed over the lifetime
+  // of this fragment. Computed in computeResourceProfile().
+  private long initialReservationTotalBytes_ = -1;
 
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
@@ -207,7 +211,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   }
 
   /**
-   * Compute the peak resource profile for all instances of this fragment per host. Must
+   * Compute the peak resource profile for an instance of this fragment. Must
    * be called after all the plan nodes and sinks are added to the fragment and resource
    * profiles of all children fragments are computed.
    */
@@ -222,7 +226,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       // Resource consumption of fragments with join build sinks is included in the
       // parent fragment because the join node blocks waiting for the join build to
       // finish - see JoinNode.computeTreeResourceProfiles().
-      perHostResourceProfile_ = ResourceProfile.invalid();
+      resourceProfile_ = ResourceProfile.invalid();
       return;
     }
 
@@ -231,16 +235,18 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // The sink is opened after the plan tree.
     ResourceProfile fInstancePostOpenProfile =
         planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile());
-    ResourceProfile fInstanceProfile =
+    resourceProfile_ =
         planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile);
-    int numInstances = getNumInstancesPerHost(analyzer.getQueryOptions().getMt_dop());
-    perHostResourceProfile_ = fInstanceProfile.multiply(numInstances);
-  }
 
-  public ResourceProfile getPerHostResourceProfile() {
-    return perHostResourceProfile_;
+    initialReservationTotalBytes_ = sink_.getResourceProfile().getMinReservationBytes();
+    for (PlanNode node: collectPlanNodes()) {
+      initialReservationTotalBytes_ +=
+          node.getNodeResourceProfile().getMinReservationBytes();
+    }
   }
 
+  public ResourceProfile getResourceProfile() { return resourceProfile_; }
+
   /**
    * Return the number of nodes on which the plan fragment will execute.
    * invalid: -1
@@ -306,6 +312,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     }
     if (sink_ != null) result.setOutput_sink(sink_.toThrift());
     result.setPartition(dataPartition_.toThrift());
+    if (resourceProfile_.isValid()) {
+      Preconditions.checkArgument(initialReservationTotalBytes_ > -1);
+      result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes());
+      result.setInitial_reservation_total_bytes(initialReservationTotalBytes_);
+    } else {
+      result.setMin_reservation_bytes(0);
+      result.setInitial_reservation_total_bytes(0);
+    }
     return result;
   }
 
@@ -386,7 +400,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     if (sink_ instanceof JoinBuildSink) {
       builder.append("included in parent fragment");
     } else {
-      builder.append(perHostResourceProfile_.getExplainString());
+      builder.append(resourceProfile_.multiply(getNumInstancesPerHost(mt_dop))
+          .getExplainString());
     }
     builder.append("\n");
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index ed6e8df..930c354 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -262,9 +262,9 @@ public class Planner {
       TQueryExecRequest request, TExplainLevel explainLevel) {
     StringBuilder str = new StringBuilder();
     boolean hasHeader = false;
-    if (request.query_ctx.isSetPer_host_min_reservation()) {
-      str.append(String.format("Per-Host Resource Reservation: Memory=%s\n",
-          PrintUtils.printBytes(request.query_ctx.getPer_host_min_reservation())));
+    if (request.isSetMax_per_host_min_reservation()) {
+      str.append(String.format("Max Per-Host Resource Reservation: Memory=%s\n",
+          PrintUtils.printBytes(request.getMax_per_host_min_reservation())));
       hasHeader = true;
     }
     if (request.isSetPer_host_mem_estimate()) {
@@ -350,12 +350,10 @@ public class Planner {
     TQueryOptions queryOptions = ctx_.getRootAnalyzer().getQueryOptions();
     int mtDop = queryOptions.getMt_dop();
 
-    // Peak per-host peak resources for all plan fragments.
-    ResourceProfile perHostPeakResources = ResourceProfile.invalid();
-    // Total of initial reservation claims in bytes by all operators in all fragment
-    // instances per host. Computed by summing the per-host minimum reservations of
-    // all plan nodes and sinks.
-    long perHostInitialReservationTotal = 0;
+    // Peak per-host peak resources for all plan fragments, assuming that all fragments
+    // are scheduled on all nodes. The actual per-host resource requirements are computed
+    // after scheduling.
+    ResourceProfile maxPerHostPeakResources = ResourceProfile.invalid();
 
     // Do a pass over all the fragments to compute resource profiles. Compute the
     // profiles bottom-up since a fragment's profile may depend on its descendants.
@@ -367,35 +365,30 @@ public class Planner {
       // Different fragments do not synchronize their Open() and Close(), so the backend
       // does not provide strong guarantees about whether one fragment instance releases
       // resources before another acquires them. Conservatively assume that all fragment
-      // instances can consume their peak resources at the same time, i.e. that the
-      // query-wide peak resources is the sum of the per-fragment-instance peak
-      // resources.
-      perHostPeakResources =
-          perHostPeakResources.sum(fragment.getPerHostResourceProfile());
-      perHostInitialReservationTotal += fragment.getNumInstancesPerHost(mtDop)
-          * fragment.getSink().getResourceProfile().getMinReservationBytes();
-
-      for (PlanNode node: fragment.collectPlanNodes()) {
-        perHostInitialReservationTotal += fragment.getNumInstances(mtDop)
-            * node.getNodeResourceProfile().getMinReservationBytes();
-      }
+      // instances run on all backends with max DOP, and can consume their peak resources
+      // at the same time, i.e. that the query-wide peak resources is the sum of the
+      // per-fragment-instance peak resources.
+      maxPerHostPeakResources = maxPerHostPeakResources.sum(
+          fragment.getResourceProfile().multiply(fragment.getNumInstancesPerHost(mtDop)));
     }
 
-    Preconditions.checkState(perHostPeakResources.getMemEstimateBytes() >= 0,
-        perHostPeakResources.getMemEstimateBytes());
-    Preconditions.checkState(perHostPeakResources.getMinReservationBytes() >= 0,
-        perHostPeakResources.getMinReservationBytes());
+    Preconditions.checkState(maxPerHostPeakResources.getMemEstimateBytes() >= 0,
+        maxPerHostPeakResources.getMemEstimateBytes());
+    Preconditions.checkState(maxPerHostPeakResources.getMinReservationBytes() >= 0,
+        maxPerHostPeakResources.getMinReservationBytes());
 
-    perHostPeakResources = MIN_PER_HOST_RESOURCES.max(perHostPeakResources);
+    maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
 
-    request.setPer_host_mem_estimate(perHostPeakResources.getMemEstimateBytes());
-    queryCtx.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes());
-    queryCtx.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal);
+    // TODO: Remove per_host_mem_estimate from the TQueryExecRequest when AC no longer
+    // needs it.
+    request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
+    request.setMax_per_host_min_reservation(
+        maxPerHostPeakResources.getMinReservationBytes());
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Per-host min buffer : " + perHostPeakResources.getMinReservationBytes());
-      LOG.trace(
-          "Estimated per-host memory: " + perHostPeakResources.getMemEstimateBytes());
-      LOG.trace("Per-host initial reservation total: " + perHostInitialReservationTotal);
+      LOG.trace("Max per-host min reservation: " +
+          maxPerHostPeakResources.getMinReservationBytes());
+      LOG.trace("Max estimated per-host memory: " +
+          maxPerHostPeakResources.getMemEstimateBytes());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index d367424..b68ab20 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -1,7 +1,7 @@
 # Rows per node is < 3000: codegen should be disabled.
 select count(*) from functional.alltypes
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=148.00MB
 Codegen disabled by planner
 
@@ -21,7 +21,7 @@ PLAN-ROOT SINK
 # Rows per node is > 3000: codegen should be enabled.
 select count(*) from functional.alltypesagg
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=100.00MB
 
 PLAN-ROOT SINK
@@ -40,7 +40,7 @@ PLAN-ROOT SINK
 # No stats on functional_parquet: codegen should be disabled.
 select count(*) from functional_parquet.alltypes
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=36.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypes
@@ -61,7 +61,7 @@ PLAN-ROOT SINK
 # > 3000 rows returned to coordinator: codegen should be enabled
 select * from functional_parquet.alltypes
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=128.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypes
@@ -78,7 +78,7 @@ select count(*)
 from functional.alltypes t1
 join functional.alltypestiny t2 on t1.id = t2.id
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=1.06MB
+Max Per-Host Resource Reservation: Memory=1.06MB
 Per-Host Resource Estimates: Memory=180.00MB
 Codegen disabled by planner
 
@@ -108,7 +108,7 @@ PLAN-ROOT SINK
 # Optimisation is disabled by cross join producing > 3000 rows
 select count(*) from functional.alltypes t1, functional.alltypes t2
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=276.00MB
 
 PLAN-ROOT SINK
@@ -137,7 +137,7 @@ select count(*) from (
   union all
   select * from functional.alltypestiny) v
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=148.00MB
 Codegen disabled by planner
 
@@ -166,7 +166,7 @@ select count(*) from (
   union all
   select * from functional.alltypes) v
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=148.00MB
 
 PLAN-ROOT SINK
@@ -193,7 +193,7 @@ PLAN-ROOT SINK
 select sum(l_discount)
 from (select * from tpch.lineitem limit 1000) v
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=274.00MB
 Codegen disabled by planner
 
@@ -214,7 +214,7 @@ PLAN-ROOT SINK
 select sum(l_discount)
 from (select * from tpch.lineitem where l_orderkey > 100 limit 1000) v
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=274.00MB
 
 PLAN-ROOT SINK

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 90a318e..320b2cd 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1,7 +1,7 @@
 # Parquet scan
 select * from tpch_parquet.lineitem
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=80.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -17,7 +17,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=80.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -39,7 +39,7 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=160.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -64,7 +64,7 @@ Per-Host Resources: mem-estimate=160.00MB mem-reservation=0B
 # Text scan
 select * from tpch.lineitem;
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=88.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -80,7 +80,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=88.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -102,7 +102,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=176.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -127,7 +127,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
 # HBase scan
 select * from functional_hbase.alltypes
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=1.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_hbase.alltypes
@@ -143,7 +143,7 @@ PLAN-ROOT SINK
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=88B cardinality=14298
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=1.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_hbase.alltypes
@@ -165,7 +165,7 @@ Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=88B cardinality=14298
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=2.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_hbase.alltypes
@@ -190,7 +190,7 @@ Per-Host Resources: mem-estimate=2.00GB mem-reservation=0B
 # Data source scan
 select * from functional.alltypes_datasource
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=1.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypes_datasource
@@ -204,7 +204,7 @@ PLAN-ROOT SINK
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=116B cardinality=5000
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=1.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypes_datasource
@@ -224,7 +224,7 @@ Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B
    mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=116B cardinality=5000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=2.00GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypes_datasource
@@ -249,7 +249,7 @@ select * from tpch.lineitem
 union all
 select * from tpch.lineitem
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=88.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -278,7 +278,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=88.00MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -313,7 +313,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=176.00MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -353,7 +353,7 @@ select l_orderkey, count(*)
 from tpch_parquet.lineitem
 group by l_orderkey
 ---- PLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=106.24MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -375,7 +375,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=8.50MB
+Max Per-Host Resource Reservation: Memory=8.50MB
 Per-Host Resource Estimates: Memory=116.24MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -415,7 +415,7 @@ Per-Host Resources: mem-estimate=106.24MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=8.50MB
+Max Per-Host Resource Reservation: Memory=8.50MB
 Per-Host Resource Estimates: Memory=232.48MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -458,7 +458,7 @@ Per-Host Resources: mem-estimate=212.48MB mem-reservation=0B
 # Non-grouping aggregation with zero-slot parquet scan
 select count(*) from tpch_parquet.lineitem
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=90.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -479,7 +479,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=100.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -511,7 +511,7 @@ Per-Host Resources: mem-estimate=90.00MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=190.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -548,7 +548,7 @@ select *
 from tpch_parquet.lineitem
 order by l_comment
 ---- PLAN
-Per-Host Resource Reservation: Memory=12.00MB
+Max Per-Host Resource Reservation: Memory=12.00MB
 Per-Host Resource Estimates: Memory=120.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -569,7 +569,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=12.00MB
+Max Per-Host Resource Reservation: Memory=12.00MB
 Per-Host Resource Estimates: Memory=120.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -597,7 +597,7 @@ Per-Host Resources: mem-estimate=120.00MB mem-reservation=12.00MB
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=24.00MB
+Max Per-Host Resource Reservation: Memory=24.00MB
 Per-Host Resource Estimates: Memory=240.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -631,7 +631,7 @@ from tpch_parquet.lineitem
 order by l_comment
 limit 100
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=80.03MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -652,7 +652,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=80.03MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -681,7 +681,7 @@ Per-Host Resources: mem-estimate=80.03MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=160.05MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -714,7 +714,7 @@ Per-Host Resources: mem-estimate=160.05MB mem-reservation=0B
 select *
 from tpch.lineitem inner join tpch.orders on l_orderkey = o_orderkey
 ---- PLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=476.41MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -746,7 +746,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=476.41MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -790,7 +790,7 @@ Per-Host Resources: mem-estimate=388.41MB mem-reservation=34.00MB
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=952.83MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -846,7 +846,7 @@ Per-Host Resources: mem-estimate=776.83MB mem-reservation=68.00MB
 select *
 from tpch.lineitem inner join /* +shuffle */ tpch.orders on l_orderkey = o_orderkey
 ---- PLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=476.41MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -878,7 +878,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=276.14MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -928,7 +928,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=452.14MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -990,7 +990,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
 select *
 from tpch.lineitem, tpch.orders
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=449.10MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1018,7 +1018,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=449.10MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1058,7 +1058,7 @@ Per-Host Resources: mem-estimate=361.10MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=898.21MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1108,7 +1108,7 @@ Per-Host Resources: mem-estimate=722.21MB mem-reservation=0B
 # Empty set node
 select * from functional.alltypes where 1 = 2
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=10.00MB
 Codegen disabled by planner
 
@@ -1121,7 +1121,7 @@ PLAN-ROOT SINK
    mem-estimate=0B mem-reservation=0B
    tuple-ids=0 row-size=0B cardinality=0
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=10.00MB
 Codegen disabled by planner
 
@@ -1134,7 +1134,7 @@ PLAN-ROOT SINK
    mem-estimate=0B mem-reservation=0B
    tuple-ids=0 row-size=0B cardinality=0
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=10.00MB
 Codegen disabled by planner
 
@@ -1151,7 +1151,7 @@ PLAN-ROOT SINK
 select max(tinyint_col) over(partition by int_col)
 from functional.alltypes
 ---- PLAN
-Per-Host Resource Reservation: Memory=10.00MB
+Max Per-Host Resource Reservation: Memory=10.00MB
 Per-Host Resource Estimates: Memory=18.00MB
 Codegen disabled by planner
 
@@ -1179,7 +1179,7 @@ PLAN-ROOT SINK
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=5B cardinality=7300
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=10.00MB
+Max Per-Host Resource Reservation: Memory=10.00MB
 Per-Host Resource Estimates: Memory=18.00MB
 Codegen disabled by planner
 
@@ -1219,7 +1219,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=0B
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=5B cardinality=7300
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=20.00MB
+Max Per-Host Resource Reservation: Memory=20.00MB
 Per-Host Resource Estimates: Memory=36.00MB
 Codegen disabled by planner
 
@@ -1266,7 +1266,7 @@ select *, row_number() over (order by o_totalprice) rnum_price,
   row_number() over (order by o_orderpriority) rnum_priority
 from tpch_parquet.orders
 ---- PLAN
-Per-Host Resource Reservation: Memory=36.00MB
+Max Per-Host Resource Reservation: Memory=36.00MB
 Per-Host Resource Estimates: Memory=58.00MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1318,7 +1318,7 @@ PLAN-ROOT SINK
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=44.00MB
+Max Per-Host Resource Reservation: Memory=44.00MB
 Per-Host Resource Estimates: Memory=94.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1377,7 +1377,7 @@ Per-Host Resources: mem-estimate=58.00MB mem-reservation=12.00MB
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=56.00MB
+Max Per-Host Resource Reservation: Memory=56.00MB
 Per-Host Resource Estimates: Memory=152.00MB
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1449,7 +1449,7 @@ select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 where l_shipmode = 'F'
 ---- PLAN
-Per-Host Resource Reservation: Memory=51.00MB
+Max Per-Host Resource Reservation: Memory=51.00MB
 Per-Host Resource Estimates: Memory=135.17MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1548,7 +1548,7 @@ PLAN-ROOT SINK
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=38.25MB
+Max Per-Host Resource Reservation: Memory=38.25MB
 Per-Host Resource Estimates: Memory=339.36MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1688,7 +1688,7 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=78B cardinality=600122
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=72.25MB
+Max Per-Host Resource Reservation: Memory=72.25MB
 Per-Host Resource Estimates: Memory=674.53MB
 
 F09:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1888,7 +1888,7 @@ order by
   o_orderdate
 limit 100
 ---- PLAN
-Per-Host Resource Reservation: Memory=80.75MB
+Max Per-Host Resource Reservation: Memory=80.75MB
 Per-Host Resource Estimates: Memory=391.29MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1968,7 +1968,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=82.88MB
+Max Per-Host Resource Reservation: Memory=82.88MB
 Per-Host Resource Estimates: Memory=500.32MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2098,7 +2098,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=2 row-size=16B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=121.12MB
+Max Per-Host Resource Reservation: Memory=121.12MB
 Per-Host Resource Estimates: Memory=953.10MB
 
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2259,7 +2259,7 @@ float_col, double_col, date_string_col, string_col, timestamp_col
 from functional.alltypes
 where year=2009 and month=05
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=16.03MB
 Codegen disabled by planner
 
@@ -2277,7 +2277,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=89B cardinality=310
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=16.03MB
 Codegen disabled by planner
 
@@ -2295,7 +2295,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=89B cardinality=310
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=32.03MB
 Codegen disabled by planner
 
@@ -2318,7 +2318,7 @@ create table dummy_insert
 partitioned by (l_partkey) as
 select l_comment, l_partkey from tpch.lineitem
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=376.99MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2335,7 +2335,7 @@ WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=184.33MB
 
 F01:PLAN FRAGMENT [HASH(l_partkey)] hosts=3 instances=3
@@ -2358,7 +2358,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=272.33MB
 
 F01:PLAN FRAGMENT [HASH(l_partkey)] hosts=3 instances=6
@@ -2390,7 +2390,7 @@ from tpch_nested_parquet.customer c,
    join c.c_orders o2 on o1.o_orderkey = o2.o_orderkey
    order by o1.o_orderkey limit 100) v
 ---- PLAN
-Per-Host Resource Reservation: Memory=69.06MB
+Max Per-Host Resource Reservation: Memory=69.06MB
 Per-Host Resource Estimates: Memory=344.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2452,7 +2452,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=66B cardinality=150000
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=69.06MB
+Max Per-Host Resource Reservation: Memory=69.06MB
 Per-Host Resource Estimates: Memory=472.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2531,7 +2531,7 @@ Per-Host Resources: mem-estimate=344.00MB mem-reservation=35.06MB
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=66B cardinality=150000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=138.12MB
+Max Per-Host Resource Reservation: Memory=138.12MB
 Per-Host Resource Estimates: Memory=944.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2619,7 +2619,7 @@ from tpch_nested_parquet.customer c,
     row_number() over (order by o_orderpriority) rnum_priority
    from c.c_orders) v;
 ---- PLAN
-Per-Host Resource Reservation: Memory=48.00MB
+Max Per-Host Resource Reservation: Memory=48.00MB
 Per-Host Resource Estimates: Memory=94.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2691,7 +2691,7 @@ PLAN-ROOT SINK
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=150000
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=48.00MB
+Max Per-Host Resource Reservation: Memory=48.00MB
 Per-Host Resource Estimates: Memory=94.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2769,7 +2769,7 @@ Per-Host Resources: mem-estimate=94.00MB mem-reservation=48.00MB
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=150000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=96.00MB
+Max Per-Host Resource Reservation: Memory=96.00MB
 Per-Host Resource Estimates: Memory=188.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 tpch_nested_parquet.customer
@@ -2861,7 +2861,7 @@ join (
   ) v2 on v2.k3 = t2.o_orderkey
 ) v1 on v1.k3 = t1.o_orderkey
 ---- PLAN
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=172.59MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2925,7 +2925,7 @@ PLAN-ROOT SINK
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=59.50MB
+Max Per-Host Resource Reservation: Memory=59.50MB
 Per-Host Resource Estimates: Memory=216.65MB
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -3019,7 +3019,7 @@ Per-Host Resources: mem-estimate=77.77MB mem-reservation=34.00MB
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=93.50MB
+Max Per-Host Resource Reservation: Memory=93.50MB
 Per-Host Resource Estimates: Memory=414.41MB
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -3153,7 +3153,7 @@ join (
   ) v2
 ) v1
 ---- PLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=137.99MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -3205,7 +3205,7 @@ PLAN-ROOT SINK
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=117B cardinality=25
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=137.99MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -3281,7 +3281,7 @@ Per-Host Resources: mem-estimate=87.53MB mem-reservation=0B
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=117B cardinality=25
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=0B
+Max Per-Host Resource Reservation: Memory=0B
 Per-Host Resource Estimates: Memory=275.97MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -3387,7 +3387,7 @@ sum(smallint_col) over (partition by tinyint_col order by smallint_col
                                                 rows between 1 following and 2 following)
                                                 from functional.alltypesagg
 ---- PLAN
-Per-Host Resource Reservation: Memory=18.00MB
+Max Per-Host Resource Reservation: Memory=18.00MB
 Per-Host Resource Estimates: Memory=18.00MB
 Codegen disabled by planner
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index 920195b..43a297b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -3,7 +3,7 @@ select straight_join *
 from tpch_parquet.customer
     inner join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=1.06MB
+Max Per-Host Resource Reservation: Memory=1.06MB
 Per-Host Resource Estimates: Memory=40.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -47,7 +47,7 @@ Per-Host Resources: mem-estimate=24.00MB mem-reservation=1.06MB
    mem-estimate=24.00MB mem-reservation=0B
    tuple-ids=0 row-size=238B cardinality=150000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=2.12MB
+Max Per-Host Resource Reservation: Memory=2.12MB
 Per-Host Resource Estimates: Memory=80.01MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -104,7 +104,7 @@ select straight_join *
 from tpch_parquet.lineitem
     left join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=420.41MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -146,7 +146,7 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=34.00MB
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=840.83MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -201,7 +201,7 @@ select straight_join *
 from tpch_parquet.orders
     join /*+shuffle*/ tpch_parquet.customer on o_custkey = c_custkey
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=82.69MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -251,7 +251,7 @@ Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=146.69MB
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -314,7 +314,7 @@ select straight_join *
 from tpch_parquet.orders
     join /*+broadcast*/ tpch_parquet.customer on o_custkey = c_custkey
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=101.38MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -358,7 +358,7 @@ Per-Host Resources: mem-estimate=77.38MB mem-reservation=34.00MB
    mem-estimate=40.00MB mem-reservation=0B
    tuple-ids=0 row-size=191B cardinality=1500000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=202.76MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -415,7 +415,7 @@ select straight_join *
 from functional_parquet.alltypes
     left join functional_parquet.alltypestiny on alltypes.id = alltypestiny.id
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=2.03GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypes, functional_parquet.alltypestiny
@@ -459,7 +459,7 @@ Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.00MB
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=88B cardinality=unavailable
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=4.06GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny
@@ -516,7 +516,7 @@ select c_nationkey, avg(c_acctbal)
 from tpch_parquet.customer
 group by c_nationkey
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=1.12MB
+Max Per-Host Resource Reservation: Memory=1.12MB
 Per-Host Resource Estimates: Memory=44.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -556,7 +556,7 @@ Per-Host Resources: mem-estimate=34.00MB mem-reservation=0B
    mem-estimate=24.00MB mem-reservation=0B
    tuple-ids=0 row-size=10B cardinality=150000
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=2.25MB
+Max Per-Host Resource Reservation: Memory=2.25MB
 Per-Host Resource Estimates: Memory=88.00MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -603,7 +603,7 @@ from tpch_parquet.lineitem
 group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=51.00MB
+Max Per-Host Resource Reservation: Memory=51.00MB
 Per-Host Resource Estimates: Memory=205.28MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -672,7 +672,7 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=51.00MB
+Max Per-Host Resource Reservation: Memory=51.00MB
 Per-Host Resource Estimates: Memory=327.24MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -753,7 +753,7 @@ Per-Host Resources: mem-estimate=160.00MB mem-reservation=0B
 select distinct *
 from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=3.31GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -791,7 +791,7 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=0B
    mem-estimate=80.00MB mem-reservation=0B
    tuple-ids=0 row-size=263B cardinality=6001215
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=6.62GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -834,7 +834,7 @@ select string_col, count(*)
 from functional_parquet.alltypestiny
 group by string_col
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=34.00MB
+Max Per-Host Resource Reservation: Memory=34.00MB
 Per-Host Resource Estimates: Memory=272.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny
@@ -876,7 +876,7 @@ Per-Host Resources: mem-estimate=144.00MB mem-reservation=0B
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ---- PARALLELPLANS
-Per-Host Resource Reservation: Memory=68.00MB
+Max Per-Host Resource Reservation: Memory=68.00MB
 Per-Host Resource Estimates: Memory=544.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
index 122d928..9ee1a65 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level0.test
@@ -5,7 +5,7 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=34.00MB'
+'Max Per-Host Resource Reservation: Memory=34.00MB'
 'Per-Host Resource Estimates: Memory=476.41MB'
 ''
 'PLAN-ROOT SINK'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
index 475758d..64b11e6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
@@ -5,7 +5,7 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=34.00MB'
+'Max Per-Host Resource Reservation: Memory=34.00MB'
 'Per-Host Resource Estimates: Memory=476.41MB'
 ''
 'PLAN-ROOT SINK'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 2fa7576..1ade744 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -5,7 +5,7 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=34.00MB'
+'Max Per-Host Resource Reservation: Memory=34.00MB'
 'Per-Host Resource Estimates: Memory=476.41MB'
 ''
 'F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
index 76d74ce..80c0d8f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level3.test
@@ -5,7 +5,7 @@ explain
 select *
 from tpch.lineitem join tpch.orders on l_orderkey = o_orderkey;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=34.00MB'
+'Max Per-Host Resource Reservation: Memory=34.00MB'
 'Per-Host Resource Estimates: Memory=476.41MB'
 ''
 'F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index b31bacf..58b9c4a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -27,7 +27,7 @@ STRING
 # Stats are available now.
 explain select id from alltypes;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=0B'
+'Max Per-Host Resource Reservation: Memory=0B'
 'Per-Host Resource Estimates: Memory=16.00MB'
 'Codegen disabled by planner'
 ''
@@ -50,7 +50,7 @@ STRING
 # Select a subset of partitions.
 explain select id from alltypes where month in (1, 2, 3);
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=0B'
+'Max Per-Host Resource Reservation: Memory=0B'
 'Per-Host Resource Estimates: Memory=16.00MB'
 'Codegen disabled by planner'
 ''
@@ -75,7 +75,7 @@ insert into alltypes partition(year, month)
 select * from functional_parquet.alltypes where year = 2009;
 explain select id from alltypes;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=0B'
+'Max Per-Host Resource Reservation: Memory=0B'
 'Per-Host Resource Estimates: Memory=16.00MB'
 ''
 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1'
@@ -99,7 +99,7 @@ insert into alltypes partition(year, month)
 select * from functional_parquet.alltypes where year = 2010;
 explain select id from alltypes where year = 2010;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=0B'
+'Max Per-Host Resource Reservation: Memory=0B'
 'Per-Host Resource Estimates: Memory=16.00MB'
 'Codegen disabled by planner'
 ''
@@ -123,7 +123,7 @@ STRING
 compute stats alltypes;
 explain select id from alltypes where year = 2010;
 ---- RESULTS: VERIFY_IS_EQUAL
-'Per-Host Resource Reservation: Memory=0B'
+'Max Per-Host Resource Reservation: Memory=0B'
 'Per-Host Resource Estimates: Memory=16.00MB'
 'Codegen disabled by planner'
 ''

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index cac4512..4d0b814 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -114,7 +114,7 @@ class TestCoordinators(CustomClusterTestSuite):
     exec_and_verify_num_executors(3)
     # Stop the cluster
     self._stop_impala_cluster()
-    # Cluster config where the oordinator can only execute coordinator fragments
+    # Cluster config where the coordinator can only execute coordinator fragments
     self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
         use_exclusive_coordinators=True)
     exec_and_verify_num_executors(2)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6c125465/tests/custom_cluster/test_mem_reservations.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_mem_reservations.py b/tests/custom_cluster/test_mem_reservations.py
new file mode 100644
index 0000000..d8a19e8
--- /dev/null
+++ b/tests/custom_cluster/test_mem_reservations.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import threading
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestMemReservations(CustomClusterTestSuite):
+  """Tests for memory reservations that require custom cluster arguments."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--buffer_pool_limit=2g")
+  def test_per_backend_min_reservation(self, vector):
+    """Tests that the per-backend minimum reservations are used (IMPALA-4833).
+       The test sets the buffer_pool_limit very low (2gb), and then runs a query against
+       two different coordinators. The query was created to have different minimum
+       reservation requirements between the coordinator node and the backends. If the
+       per-backend minimum reservations are not used, then one of the queries fails to
+       acquire its minimum reservation. This was verified to fail before IMPALA-4833, and
+       succeeds after.
+    """
+    assert len(self.cluster.impalads) == 3
+
+    # This query will have scan fragments on all nodes, but the coordinator fragment
+    # has 6 analytic nodes, 5 sort nodes, and an aggregation.
+    QUERY = """
+    select max(t.c1), avg(t.c2), min(t.c3), avg(c4), avg(c5), avg(c6)
+    from (select
+        max(tinyint_col) over (order by int_col) c1,
+        avg(tinyint_col) over (order by smallint_col) c2,
+        min(tinyint_col) over (order by smallint_col desc) c3,
+        rank() over (order by int_col desc) c4,
+        dense_rank() over (order by bigint_col) c5,
+        first_value(tinyint_col) over (order by bigint_col desc) c6
+        from functional.alltypes) t;
+        """
+
+    # Set the DEFAULT_SPILLABLE_BUFFER_SIZE and MIN_SPILLABLE_BUFFER_SIZE to 64MiB
+    # so that the coordinator node requires ~1.2gb and the other backends require ~200mb.
+    CONFIG_MAP = {'DEFAULT_SPILLABLE_BUFFER_SIZE': '67108864',
+                  'MIN_SPILLABLE_BUFFER_SIZE': '67108864'}
+
+    # Create two threads to submit QUERY to two different coordinators concurrently.
+    class QuerySubmitThread(threading.Thread):
+      def __init__(self, coordinator):
+        super(QuerySubmitThread, self).__init__()
+        self.coordinator = coordinator
+        self.error = None
+
+      def run(self):
+        client = self.coordinator.service.create_beeswax_client()
+        try:
+          client.set_configuration(CONFIG_MAP)
+          for i in xrange(20):
+            result = client.execute(QUERY)
+            assert result.success
+            assert len(result.data) == 1
+        except Exception, e:
+          self.error = str(e)
+        finally:
+          client.close()
+
+    threads = [QuerySubmitThread(self.cluster.impalads[i]) for i in xrange(2)]
+    for t in threads: t.start()
+    for t in threads:
+      t.join()
+      assert t.error is None