You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/10/05 21:39:15 UTC

[3/8] impala git commit: IMPALA-7349: Add Admission control support for automatically setting per host memory limit for a query

IMPALA-7349: Add Admission control support for automatically setting
per host memory limit for a query

With this patch the per host memory limit of a query is automatically
set using the mem_limit set in the query options and the mem_estimate
calculated by the planner based on the following pseudo code:

if mem_limit is set in query options:
  use that and if 'clamp-mem-limit-query-option' is true:
    enforce the min/max query mem limits defined in the pool config.
else:
  mem_limit = max(mem_estiamte,
    min_mem_limit_required_to_accomodate_largest_initial_reservation)
  finally, enforce min/max query mem limits defined in the pool
  config on this value.

This calculated mem limit will also be used for admission accounting
and consequently for admission control. Moreover, three new pool
configuration options have been added to enable this behaviour:

"min-query-mem-limit" & "max-query-mem-limit" => help
clamp the per host memory limit for a query. If both these limits
are not configured, then the estimates from planning are not used
as a memory limit and only used for making admission decisions.
Moreover the estimates will no longer have a lower bound based
on the largest initial reservation.

"clamp-mem-limit-query-option" => if false, the mem_limit defined in
the query options is used directly and the max/min query mem limits
are not enforced on it.

Testing:
Added e2e test cases.
Added frontend tests for changes to RequestPoolService.
Successfully passed exhaustive tests.

Change-Id: Ifec00141651982f5975803c2165b7d7a10ebeaa6
Reviewed-on: http://gerrit.cloudera.org:8080/11157
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fc91e706
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fc91e706
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fc91e706

Branch: refs/heads/master
Commit: fc91e706b4f3b45cdda28d977f652cee3f050e7b
Parents: c80c62f
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Jul 25 18:35:47 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 04:38:24 2018 +0000

----------------------------------------------------------------------
 .../benchmarks/process-wide-locks-benchmark.cc  |   2 +-
 be/src/runtime/coordinator-backend-state.cc     |   1 +
 be/src/runtime/coordinator.cc                   |   3 +-
 be/src/runtime/mem-tracker.cc                   |  16 +-
 be/src/runtime/mem-tracker.h                    |  12 +-
 be/src/runtime/query-exec-mgr.cc                |  12 +-
 be/src/runtime/query-exec-mgr.h                 |   5 +-
 be/src/runtime/query-state.cc                   |  18 +-
 be/src/runtime/query-state.h                    |  13 +-
 be/src/runtime/runtime-state.cc                 |   9 +-
 be/src/runtime/test-env.cc                      |   7 +-
 be/src/scheduling/admission-controller.cc       | 204 +++++++++++------
 be/src/scheduling/admission-controller.h        | 141 ++++++++----
 be/src/scheduling/query-schedule.cc             |  85 ++++---
 be/src/scheduling/query-schedule.h              |  57 ++++-
 be/src/scheduling/request-pool-service.cc       |   3 +
 be/src/scheduling/scheduler.cc                  |   4 +
 be/src/service/client-request-state.cc          |   5 +-
 common/thrift/ImpalaInternalService.thrift      |  18 ++
 common/thrift/metrics.json                      |  30 +++
 .../apache/impala/util/RequestPoolService.java  |  41 +++-
 .../impala/util/TestRequestPoolService.java     |  19 +-
 fe/src/test/resources/fair-scheduler-test.xml   |   4 +
 fe/src/test/resources/llama-site-test.xml       |  12 +
 .../resources/mem-limit-test-fair-scheduler.xml |  46 ++++
 .../resources/mem-limit-test-llama-site.xml     |  88 +++++++
 .../QueryTest/admission-max-min-mem-limits.test | 153 +++++++++++++
 .../admission-reject-min-reservation.test       |  12 +-
 tests/common/resource_pool_config.py            |  96 ++++++++
 .../custom_cluster/test_admission_controller.py | 229 ++++++++++++++-----
 30 files changed, 1077 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
index 465bb00..373f5b7 100644
--- a/be/src/benchmarks/process-wide-locks-benchmark.cc
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -84,7 +84,7 @@ void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
   query_ctx.__set_request_pool(resolved_pool);
 
   QueryState *query_state;
-  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
+  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx, -1);
   DCHECK(query_state != nullptr);
   query_state->AcquireBackendResourceRefcount();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 95484a4..9acab79 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -84,6 +84,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
       backend_exec_params_->min_mem_reservation_bytes);
   rpc_params->__set_initial_mem_reservation_total_claims(
       backend_exec_params_->initial_mem_reservation_total_claims);
+  rpc_params->__set_per_backend_mem_limit(coord_.schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
   rpc_params->__isset.fragment_ctxs = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 04f9392..eb9fe81 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -107,7 +107,8 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
-  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
+  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
+      query_ctx(), schedule_.per_backend_mem_limit());
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index d204ce8..a2e2295 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -209,26 +209,22 @@ MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker(
 }
 
 MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id,
-    const TQueryOptions& query_options, const string& pool_name, ObjectPool* obj_pool) {
-  int64_t byte_limit = -1;
-  if (query_options.__isset.mem_limit && query_options.mem_limit > 0) {
-    byte_limit = query_options.mem_limit;
-  }
-  if (byte_limit != -1) {
-    if (byte_limit > MemInfo::physical_mem()) {
-      LOG(WARNING) << "Memory limit " << PrettyPrinter::Print(byte_limit, TUnit::BYTES)
+    int64_t mem_limit, const string& pool_name, ObjectPool* obj_pool) {
+  if (mem_limit != -1) {
+    if (mem_limit > MemInfo::physical_mem()) {
+      LOG(WARNING) << "Memory limit " << PrettyPrinter::Print(mem_limit, TUnit::BYTES)
                    << " exceeds physical memory of "
                    << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
     }
     VLOG(2) << "Using query memory limit: "
-            << PrettyPrinter::Print(byte_limit, TUnit::BYTES);
+            << PrettyPrinter::Print(mem_limit, TUnit::BYTES);
   }
 
   MemTracker* pool_tracker =
       ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(
           pool_name, true);
   MemTracker* tracker = obj_pool->Add(new MemTracker(
-      byte_limit, Substitute("Query($0)", PrintId(id)), pool_tracker));
+      mem_limit, Substitute("Query($0)", PrintId(id)), pool_tracker));
   tracker->is_query_mem_tracker_ = true;
   tracker->query_id_ = id;
   return tracker;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index d4b4dd4..756a22a 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -120,13 +120,11 @@ class MemTracker {
   /// The counters should be owned by the fragment's RuntimeProfile.
   void EnableReservationReporting(const ReservationTrackerCounters& counters);
 
-  /// Construct a MemTracker object for query 'id'. The query limits are determined based
-  /// on 'query_options'. The MemTracker is a child of the request pool MemTracker for
-  /// 'pool_name', which is created if needed. The returned MemTracker is owned by
-  /// 'obj_pool'.
-  static MemTracker* CreateQueryMemTracker(const TUniqueId& id,
-      const TQueryOptions& query_options, const std::string& pool_name,
-      ObjectPool* obj_pool);
+  /// Construct a MemTracker object for query 'id' with 'mem_limit' as the memory limit.
+  /// The MemTracker is a child of the request pool MemTracker for 'pool_name', which is
+  /// created if needed. The returned MemTracker is owned by 'obj_pool'.
+  static MemTracker* CreateQueryMemTracker(const TUniqueId& id, int64_t mem_limit,
+      const std::string& pool_name, ObjectPool* obj_pool);
 
   /// Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 4e1a340..1eca80e 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -47,7 +47,8 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
           << " coord=" << TNetworkAddressToString(params.query_ctx.coord_address);
 
   bool dummy;
-  QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
+  QueryState* qs =
+      GetOrCreateQueryState(params.query_ctx, params.per_backend_mem_limit, &dummy);
   Status status = qs->Init(params);
   if (!status.ok()) {
     qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
@@ -71,9 +72,10 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   return Status::OK();
 }
 
-QueryState* QueryExecMgr::CreateQueryState(const TQueryCtx& query_ctx) {
+QueryState* QueryExecMgr::CreateQueryState(
+    const TQueryCtx& query_ctx, int64_t mem_limit) {
   bool created;
-  QueryState* qs = GetOrCreateQueryState(query_ctx, &created);
+  QueryState* qs = GetOrCreateQueryState(query_ctx, mem_limit, &created);
   DCHECK(created);
   return qs;
 }
@@ -97,7 +99,7 @@ QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
 }
 
 QueryState* QueryExecMgr::GetOrCreateQueryState(
-    const TQueryCtx& query_ctx, bool* created) {
+    const TQueryCtx& query_ctx, int64_t mem_limit, bool* created) {
   QueryState* qs = nullptr;
   int refcnt;
   {
@@ -108,7 +110,7 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
     auto it = map_ref->find(query_ctx.query_id);
     if (it == map_ref->end()) {
       // register new QueryState
-      qs = new QueryState(query_ctx);
+      qs = new QueryState(query_ctx, mem_limit);
       map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index bddd731..262ef59 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -54,7 +54,7 @@ class QueryExecMgr : public CacheLineAligned {
   /// Creates a QueryState for the given query with the provided parameters. Only valid
   /// to call if the QueryState does not already exist. The caller must call
   /// ReleaseQueryState() with the returned QueryState to decrement the refcount.
-  QueryState* CreateQueryState(const TQueryCtx& query_ctx);
+  QueryState* CreateQueryState(const TQueryCtx& query_ctx, int64_t mem_limit);
 
   /// If a QueryState for the given query exists, increments that refcount and returns
   /// the QueryState, otherwise returns nullptr.
@@ -71,7 +71,8 @@ class QueryExecMgr : public CacheLineAligned {
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.
   /// Increments the refcount.
-  QueryState* GetOrCreateQueryState(const TQueryCtx& query_ctx, bool* created);
+  QueryState* GetOrCreateQueryState(
+      const TQueryCtx& query_ctx, int64_t mem_limit, bool* created);
 
   /// Execute instances and decrement refcount (acquire ownership of qs).
   void StartQueryHelper(QueryState* qs);

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1dc41dc..12710bf 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -55,7 +55,8 @@ QueryState::ScopedRef::~ScopedRef() {
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
 }
 
-QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
+QueryState::QueryState(
+    const TQueryCtx& query_ctx, int64_t mem_limit, const string& request_pool)
   : query_ctx_(query_ctx),
     backend_resource_refcnt_(0),
     refcnt_(0),
@@ -77,7 +78,8 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   if (query_options.batch_size <= 0) {
     query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
   }
-  InitMemTrackers();
+  query_mem_tracker_ = MemTracker::CreateQueryMemTracker(
+      query_id(), mem_limit, query_ctx_.request_pool, &obj_pool_);
 }
 
 void QueryState::ReleaseBackendResources() {
@@ -154,18 +156,6 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   return Status::OK();
 }
 
-void QueryState::InitMemTrackers() {
-  const string& pool = query_ctx_.request_pool;
-  int64_t bytes_limit = -1;
-  if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
-    bytes_limit = query_options().mem_limit;
-    VLOG(2) << "Using query memory limit from query options: "
-            << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
-  }
-  query_mem_tracker_ =
-      MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
-}
-
 Status QueryState::InitBufferPoolState() {
   ExecEnv* exec_env = ExecEnv::GetInstance();
   int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 5cb499a..9810156 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -390,17 +390,16 @@ class QueryState {
   /// StartFInstances().
   int64_t fragment_events_start_time_ = 0;
 
-  /// Create QueryState w/ refcnt of 0.
-  /// The query is associated with the resource pool query_ctx.request_pool or
-  /// 'request_pool', if the former is not set (needed for tests).
-  QueryState(const TQueryCtx& query_ctx, const std::string& request_pool = "");
+  /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
+  /// to the query mem tracker. The query is associated with the resource pool set in
+  /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
+  /// for tests).
+  QueryState(const TQueryCtx& query_ctx, int64_t mem_limit,
+      const std::string& request_pool = "");
 
   /// Execute the fragment instance and decrement the refcnt when done.
   void ExecFInstance(FragmentInstanceState* fis);
 
-  /// Called from constructor to initialize MemTrackers.
-  void InitMemTrackers();
-
   /// Called from Init() to set up buffer reservations and the file group.
   Status InitBufferPoolState() WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 385087c..127abb4 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -80,10 +80,15 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
 }
 
 // Constructor for standalone RuntimeState for test execution and fe-support.cc.
-// Sets up a dummy local QueryState to allow evaluating exprs, etc.
+// Sets up a dummy local QueryState (with mem_limit picked up from the query options)
+// to allow evaluating exprs, etc.
 RuntimeState::RuntimeState(
     const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
-  : query_state_(new QueryState(qctx, "test-pool")),
+  : query_state_(new QueryState(qctx, qctx.client_request.query_options.__isset.mem_limit
+                && qctx.client_request.query_options.mem_limit > 0 ?
+            qctx.client_request.query_options.mem_limit :
+            -1,
+        "test-pool")),
     fragment_ctx_(nullptr),
     instance_ctx_(nullptr),
     local_query_state_(query_state_),

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 486a230..ef5e978 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -113,9 +113,14 @@ Status TestEnv::CreateQueryState(
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
   query_ctx.request_pool = "test-pool";
+  TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
+  int64_t mem_limit =
+      query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
+      query_options_to_use->mem_limit :
+      -1;
 
   // CreateQueryState() enforces the invariant that 'query_id' must be unique.
-  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx);
+  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, mem_limit);
   query_states_.push_back(qs);
   // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
   // param

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 2e1f7d9..a23fce7 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -41,7 +41,7 @@ DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in "
 namespace impala {
 
 /// Convenience method.
-std::string PrintBytes(int64_t value) {
+string PrintBytes(int64_t value) {
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
@@ -89,6 +89,12 @@ const string POOL_MAX_REQUESTS_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-requests.$0";
 const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-queued.$0";
+const string POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
+  "admission-controller.pool-max-query-mem-limit.$0";
+const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
+  "admission-controller.pool-min-query-mem-limit.$0";
+const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT =
+  "admission-controller.pool-clamp-mem-limit-query-option.$0";
 
 // Profile query events
 const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
@@ -106,14 +112,16 @@ const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)";
 const string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = "Initial admission queue reason";
 const string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = "waited $0 ms, reason: $1";
 const string PROFILE_INFO_KEY_LAST_QUEUED_REASON = "Latest admission queue reason";
+const string PROFILE_INFO_KEY_ADMITTED_MEM = "Cluster Memory Admitted";
 
 // Error status string details
 const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
-    "minimum memory reservation is greater than memory available to the query "
-    "for buffer reservations. Memory reservation needed given the current plan: $0. Set "
-    "mem_limit to at least $1. Note that changing the mem_limit may also change the "
-    "plan. See the query profile for more information about the per-node memory "
-    "requirements.";
+    "minimum memory reservation is greater than memory available to the query for buffer "
+    "reservations. Memory reservation needed given the current plan: $0. Adjust either "
+    "the mem_limit or the pool config (max-query-mem-limit, min-query-mem-limit) for the "
+    "query to allow the query memory limit to be at least $1. Note that changing the "
+    "mem_limit may also change the plan. See the query profile for more information "
+    "about the per-node memory requirements.";
 const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
     "minimum memory reservation on backend '$0' is greater than memory available to the "
     "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
@@ -256,9 +264,10 @@ Status AdmissionController::Init() {
 }
 
 void AdmissionController::PoolStats::Admit(const QuerySchedule& schedule) {
-  int64_t mem_admitted = schedule.GetClusterMemoryEstimate();
-  local_mem_admitted_ += mem_admitted;
-  metrics_.local_mem_admitted->Increment(mem_admitted);
+  int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
+  DCHECK_GT(cluster_mem_admitted, 0);
+  local_mem_admitted_ += cluster_mem_admitted;
+  metrics_.local_mem_admitted->Increment(cluster_mem_admitted);
 
   agg_num_running_ += 1;
   metrics_.agg_num_running->Increment(1L);
@@ -270,9 +279,10 @@ void AdmissionController::PoolStats::Admit(const QuerySchedule& schedule) {
 }
 
 void AdmissionController::PoolStats::Release(const QuerySchedule& schedule) {
-  int64_t mem_admitted = schedule.GetClusterMemoryEstimate();
-  local_mem_admitted_ -= mem_admitted;
-  metrics_.local_mem_admitted->Increment(-mem_admitted);
+  int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
+  DCHECK_GT(cluster_mem_admitted, 0);
+  local_mem_admitted_ -= cluster_mem_admitted;
+  metrics_.local_mem_admitted->Increment(-cluster_mem_admitted);
 
   agg_num_running_ -= 1;
   metrics_.agg_num_running->Increment(-1L);
@@ -315,6 +325,7 @@ void AdmissionController::PoolStats::Dequeue(const QuerySchedule& schedule,
 
 void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
     int64_t per_node_mem) {
+  DCHECK_NE(per_node_mem, 0);
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
     const string host = TNetworkAddressToString(host_addr);
@@ -326,6 +337,25 @@ void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
   }
 }
 
+bool AdmissionController::CanAccommodateMaxInitialReservation(
+    const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
+    string* mem_unavailable_reason) {
+  const int64_t per_backend_mem_limit = schedule.per_backend_mem_limit();
+  if (per_backend_mem_limit > 0) {
+    const int64_t max_reservation =
+        ReservationUtil::GetReservationLimitFromMemLimit(per_backend_mem_limit);
+    const int64_t largest_min_mem_reservation = schedule.largest_min_reservation();
+    if (largest_min_mem_reservation > max_reservation) {
+      const int64_t required_mem_limit =
+          ReservationUtil::GetMinMemLimitFromReservation(largest_min_mem_reservation);
+      *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
+          PrintBytes(largest_min_mem_reservation), PrintBytes(required_mem_limit));
+      return false;
+    }
+  }
+  return true;
+}
+
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, string* mem_unavailable_reason) {
   const string& pool_name = schedule.request_pool();
@@ -339,17 +369,17 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   //    specified.
   // 2) Each individual backend must have enough mem available within its process limit
   //    to execute the query.
-  int64_t per_node_mem_needed = schedule.GetPerHostMemoryEstimate();
-  int64_t cluster_mem_needed = schedule.GetClusterMemoryEstimate();
+  int64_t per_host_mem_to_admit = schedule.per_backend_mem_to_admit();
+  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
 
   // Case 1:
   PoolStats* stats = GetPoolStats(pool_name);
   VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
-           << " cluster_mem_needed=" << PrintBytes(cluster_mem_needed)
+           << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
            << " pool_max_mem=" << PrintBytes(pool_max_mem);
-  if (stats->EffectiveMemReserved() + cluster_mem_needed > pool_max_mem) {
+  if (stats->EffectiveMemReserved() + cluster_mem_to_admit > pool_max_mem) {
     *mem_unavailable_reason = Substitute(POOL_MEM_NOT_AVAILABLE, pool_name,
-        PrintBytes(pool_max_mem), PrintBytes(cluster_mem_needed),
+        PrintBytes(pool_max_mem), PrintBytes(cluster_mem_to_admit),
         PrintBytes(max(pool_max_mem - stats->EffectiveMemReserved(), 0L)));
     return false;
   }
@@ -364,30 +394,42 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
-             << " needs=" << PrintBytes(per_node_mem_needed)
+             << " needs=" << PrintBytes(per_host_mem_to_admit)
              << " proc_limit=" << PrintBytes(proc_mem_limit);
     int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
-    if (effective_host_mem_reserved + per_node_mem_needed > proc_mem_limit) {
+    if (effective_host_mem_reserved + per_host_mem_to_admit > proc_mem_limit) {
       *mem_unavailable_reason = Substitute(HOST_MEM_NOT_AVAILABLE, host_id,
-          PrintBytes(per_node_mem_needed),
+          PrintBytes(per_host_mem_to_admit),
           PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)),
           PrintBytes(proc_mem_limit));
       return false;
     }
   }
-
+  const TQueryOptions& query_opts = schedule.query_options();
+  if (!query_opts.__isset.buffer_pool_limit || query_opts.buffer_pool_limit <= 0) {
+    // Check if a change in pool_cfg.max_query_mem_limit (while the query was queued)
+    // resulted in a decrease in the computed per_host_mem_limit such that it can no
+    // longer accommodate the largest min_reservation.
+    return CanAccommodateMaxInitialReservation(
+        schedule, pool_cfg, mem_unavailable_reason);
+  }
   return true;
 }
 
 bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, bool admit_from_queue, string* not_admitted_reason) {
+  // Can't admit if:
+  //  (a) Pool configuration is invalid
+  //  (b) There are already queued requests (and this is not admitting from the queue).
+  //  (c) Already at the maximum number of requests
+  //  (d) There are not enough memory resources available for the query
+
+  // Queries from a misconfigured pool will remain queued till they either time out or the
+  // pool config is changed to a valid config.
+  if (!IsPoolConfigValid(pool_cfg, not_admitted_reason)) return false;
+
   const string& pool_name = schedule.request_pool();
   PoolStats* stats = GetPoolStats(pool_name);
-
-  // Can't admit if:
-  //  (a) There are already queued requests (and this is not admitting from the queue).
-  //  (b) Already at the maximum number of requests
-  //  (c) Request will go over the mem limit
   if (!admit_from_queue && stats->local_stats().num_queued > 0) {
     *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY,
         stats->local_stats().num_queued);
@@ -410,6 +452,7 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
   // immediately. The first check that fails is the error that is reported. The order of
   // the checks isn't particularly important, though some thought was given to ordering
   // them in a way that might make the sense for a user.
+  if (!IsPoolConfigValid(pool_cfg, rejection_reason)) return true;
 
   // Compute the max (over all backends) and cluster total (across all backends) for
   // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
@@ -445,18 +488,9 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
           PrintBytes(largest_min_mem_reservation.second));
       return true;
     }
-  } else if (query_opts.__isset.mem_limit && query_opts.mem_limit > 0) {
+  } else if (!CanAccommodateMaxInitialReservation(schedule, pool_cfg, rejection_reason)) {
     // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit.
-    const int64_t mem_limit = query_opts.mem_limit;
-    const int64_t max_reservation =
-        ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
-    if (largest_min_mem_reservation.second > max_reservation) {
-      const int64_t required_mem_limit = ReservationUtil::GetMinMemLimitFromReservation(
-          largest_min_mem_reservation.second);
-      *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(largest_min_mem_reservation.second), PrintBytes(required_mem_limit));
-      return true;
-    }
+    return true;
   }
 
   // Check thread reservation limits.
@@ -464,8 +498,8 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
       && query_opts.thread_reservation_limit > 0
       && max_thread_reservation.second > query_opts.thread_reservation_limit) {
     *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED,
-        TNetworkAddressToString(*max_thread_reservation.first), max_thread_reservation.second,
-        query_opts.thread_reservation_limit);
+        TNetworkAddressToString(*max_thread_reservation.first),
+        max_thread_reservation.second, query_opts.thread_reservation_limit);
     return true;
   }
   if (query_opts.__isset.thread_reservation_aggregate_limit
@@ -495,16 +529,16 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
           PrintBytes(cluster_min_mem_reservation_bytes));
       return true;
     }
-    if (schedule.GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+    int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
+    if (cluster_mem_to_admit > pool_cfg.max_mem_resources) {
       *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
-          PrintBytes(schedule.GetClusterMemoryEstimate()),
-          PrintBytes(pool_cfg.max_mem_resources));
+          PrintBytes(cluster_mem_to_admit), PrintBytes(pool_cfg.max_mem_resources));
       return true;
     }
-    int64_t perHostMemoryEstimate = schedule.GetPerHostMemoryEstimate();
-    if (perHostMemoryEstimate > min_proc_mem_limit.second) {
+    int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
+    if (per_backend_mem_to_admit > min_proc_mem_limit.second) {
       *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(perHostMemoryEstimate), PrintBytes(min_proc_mem_limit.second),
+          PrintBytes(per_backend_mem_to_admit), PrintBytes(min_proc_mem_limit.second),
           TNetworkAddressToString(*min_proc_mem_limit.first));
       return true;
     }
@@ -525,19 +559,24 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool
   metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
   metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
   metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
+  metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
+  metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
+  metrics_.clamp_mem_limit_query_option->SetValue(
+      pool_cfg.clamp_mem_limit_query_option);
 }
 
-Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
+Status AdmissionController::SubmitForAdmission(QuerySchedule* schedule,
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome) {
   const string& pool_name = schedule->request_pool();
   TPoolConfig pool_cfg;
   RETURN_IF_ERROR(request_pool_service_->GetPoolConfig(pool_name, &pool_cfg));
+  schedule->UpdateMemoryRequirements(pool_cfg);
   const int64_t max_requests = pool_cfg.max_requests;
   const int64_t max_queued = pool_cfg.max_queued;
   const int64_t max_mem = pool_cfg.max_mem_resources;
 
   // Note the queue_node will not exist in the queue when this method returns.
-  QueueNode queue_node(*schedule, admit_outcome, schedule->summary_profile());
+  QueueNode queue_node(schedule, admit_outcome, schedule->summary_profile());
   string not_admitted_reason;
 
   schedule->query_events()->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
@@ -550,8 +589,8 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
     PoolStats* stats = GetPoolStats(pool_name);
     stats->UpdateConfigMetrics(pool_cfg);
     VLOG_QUERY << "Schedule for id=" << PrintId(schedule->query_id()) << " in pool_name="
-               << pool_name << " cluster_mem_needed="
-               << PrintBytes(schedule->GetClusterMemoryEstimate())
+               << pool_name << " per_host_mem_estimate="
+               << PrintBytes(schedule->GetPerHostMemoryEstimate())
                << " PoolConfig: max_requests=" << max_requests << " max_queued="
                << max_queued << " max_mem=" << PrintBytes(max_mem);
     VLOG_QUERY << "Stats: " << stats->DebugString();
@@ -587,10 +626,7 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
         return Status::CANCELLED;
       }
       VLOG_QUERY << "Admitted query id=" << PrintId(schedule->query_id());
-      stats->Admit(*schedule);
-      UpdateHostMemAdmitted(*schedule, schedule->GetPerHostMemoryEstimate());
-      schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-          PROFILE_INFO_VAL_ADMIT_IMMEDIATELY);
+      AdmitQuery(schedule, true);
       VLOG_RPC << "Final: " << stats->DebugString();
       return Status::OK();
     }
@@ -664,8 +700,6 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
     // not change them here.
     DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
     DCHECK(!queue->Contains(&queue_node));
-    schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-        PROFILE_INFO_VAL_ADMIT_QUEUED);
     VLOG_QUERY << "Admitted queued query id=" << PrintId(schedule->query_id());
     VLOG_RPC << "Final: " << stats->DebugString();
     return Status::OK();
@@ -678,7 +712,7 @@ void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(pool_name);
     stats->Release(schedule);
-    UpdateHostMemAdmitted(schedule, -schedule.GetPerHostMemoryEstimate());
+    UpdateHostMemAdmitted(schedule, -schedule.per_backend_mem_to_admit());
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -939,31 +973,31 @@ void AdmissionController::DequeueLoop() {
       while (max_to_dequeue > 0 && !queue.empty()) {
         QueueNode* queue_node = queue.head();
         DCHECK(queue_node != nullptr);
-        const QuerySchedule& schedule = queue_node->schedule;
+        QuerySchedule* schedule = queue_node->schedule;
+        schedule->UpdateMemoryRequirements(pool_config);
         bool is_cancelled = queue_node->admit_outcome->IsSet()
             && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED;
         string not_admitted_reason;
         // TODO: Requests further in the queue may be blocked unnecessarily. Consider a
         // better policy once we have better test scenarios.
         if (!is_cancelled
-            && !CanAdmitRequest(schedule, pool_config, true, &not_admitted_reason)) {
+            && !CanAdmitRequest(*schedule, pool_config, true, &not_admitted_reason)) {
           LogDequeueFailed(queue_node, not_admitted_reason);
           break;
         }
-        VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id());
+        VLOG_RPC << "Dequeuing query=" << PrintId(schedule->query_id());
         queue.Dequeue();
         --max_to_dequeue;
-        stats->Dequeue(schedule, false);
+        stats->Dequeue(*schedule, false);
         // If query is already cancelled, just dequeue and continue.
         AdmissionOutcome outcome =
             queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED);
         if (outcome == AdmissionOutcome::CANCELLED) {
-          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(schedule.query_id());
+          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(schedule->query_id());
           continue;
         }
         DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
-        stats->Admit(schedule);
-        UpdateHostMemAdmitted(schedule, schedule.GetPerHostMemoryEstimate());
+        AdmitQuery(schedule, false);
       }
       pools_for_updates_.insert(pool_name);
     }
@@ -973,7 +1007,7 @@ void AdmissionController::DequeueLoop() {
 void AdmissionController::LogDequeueFailed(QueueNode* node,
     const string& not_admitted_reason) {
   VLOG_QUERY << "Could not dequeue query id="
-             << PrintId(node->schedule.query_id())
+             << PrintId(node->schedule->query_id())
              << " reason: " << not_admitted_reason;
   node->profile->AddInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON,
       not_admitted_reason);
@@ -990,6 +1024,42 @@ AdmissionController::GetPoolStats(const string& pool_name) {
   return &it->second;
 }
 
+bool AdmissionController::IsPoolConfigValid(const TPoolConfig& pool_cfg, string* reason) {
+  if (pool_cfg.max_query_mem_limit > 0
+      && pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) {
+    *reason = Substitute("Invalid pool config: the min_query_mem_limit is greater than "
+                         "the max_query_mem_limit ($0 > $1)",
+        pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit);
+    return false;
+  }
+  if (pool_cfg.max_mem_resources > 0
+      && pool_cfg.min_query_mem_limit > pool_cfg.max_mem_resources) {
+    *reason = Substitute("Invalid pool config: the min_query_mem_limit is greater than "
+                         "the max_mem_resources ($0 > $1)",
+        pool_cfg.min_query_mem_limit, pool_cfg.max_mem_resources);
+    return false;
+  }
+  return true;
+}
+
+void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
+  PoolStats* pool_stats = GetPoolStats(schedule->request_pool());
+  VLOG_RPC << "For Query " << schedule->query_id() << " per_backend_mem_limit set to: "
+           << PrintBytes(schedule->per_backend_mem_limit())
+           << " per_backend_mem_to_admit set to: "
+           << PrintBytes(schedule->per_backend_mem_to_admit());
+  // Update memory accounting.
+  pool_stats->Admit(*schedule);
+  UpdateHostMemAdmitted(*schedule, schedule->per_backend_mem_to_admit());
+  // Update summary profile.
+  const string& admission_result =
+      was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result);
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(schedule->GetClusterMemoryToAdmit()));
+}
+
 void AdmissionController::PoolStats::InitMetrics() {
   metrics_.total_admitted = parent_->metrics_group_->AddCounter(
       TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
@@ -1030,5 +1100,11 @@ void AdmissionController::PoolStats::InitMetrics() {
       POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
   metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.max_query_mem_limit = parent_->metrics_group_->AddGauge(
+      POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.min_query_mem_limit = parent_->metrics_group_->AddGauge(
+      POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.clamp_mem_limit_query_option = parent_->metrics_group_->AddProperty<bool>(
+      POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_);
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e5ffd99..b88356c 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -53,10 +53,12 @@ enum class AdmissionOutcome {
 /// on available cluster resources, which are configured in one or more resource pools. A
 /// request will either be admitted for immediate execution, queued for later execution,
 /// or rejected.  Resource pools can be configured to have maximum number of concurrent
-/// queries, maximum memory, and a maximum queue size. Queries will be queued if there
-/// are already too many queries executing or there isn't enough available memory. Once
-/// the queue reaches the maximum queue size, incoming queries will be rejected. Requests
-/// in the queue will time out after a configurable timeout.
+/// queries, maximum cluster wide memory, maximum queue size, max and min per host memory
+/// limit for every query, and to set whether the mem_limit query option will be clamped
+/// by the previously mentioned max/min per host limits or not. Queries will be queued if
+/// there are already too many queries executing or there isn't enough available memory.
+/// Once the queue reaches the maximum queue size, incoming queries will be rejected.
+/// Requests in the queue will time out after a configurable timeout.
 ///
 /// Any impalad can act as a coordinator and thus also an admission controller, so some
 /// cluster state must be shared between impalads in order to make admission decisions on
@@ -88,17 +90,27 @@ enum class AdmissionOutcome {
 ///
 /// The memory required for admission for a request is specified as the query option
 /// MEM_LIMIT (either explicitly or via a default value). This is a per-node value. If
-/// there is no memory limit, the per-node estimate from planning is used instead. The
-/// following two conditions must hold in order for the request to be admitted:
-///  1) There must be enough memory resources available in this resource pool for the
+/// there is no memory limit, the per-node estimate from planning is used instead as a
+/// memory limit and a lower bound is enforced on it based on the largest initial
+/// reservation of the query. The final memory limit used is also clamped by the max/min
+/// memory limits configured for the pool with an option to not enforce these limits on
+/// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
+/// estimates from planning are not used as a memory limit and only used for making
+/// admission decisions. Moreover the estimates will no longer have a lower bound based on
+/// the largest initial reservation).
+/// The following three conditions must hold in order for the request to be admitted:
+///  1) The current pool configuration is valid.
+///  2) There must be enough memory resources available in this resource pool for the
 ///     request. The max memory resources configured for the resource pool specifies the
 ///     aggregate, cluster-wide memory that may be reserved by all executing queries in
 ///     this pool. Thus the aggregate memory to be reserved across all participating
 ///     backends for this request, *plus* that of already admitted requests must be less
 ///     than or equal to the max resources specified.
-///  2) All participating backends must have enough memory available. Each impalad has a
+///  3) All participating backends must have enough memory available. Each impalad has a
 ///     per-process mem limit, and that is the max memory that can be reserved on that
 ///     backend.
+///  4) The final per host memory limit used can accommodate the largest Initial
+///     reservation.
 ///
 /// In order to admit based on these conditions, the admission controller accounts for
 /// the following on both a per-host and per-pool basis:
@@ -117,7 +129,7 @@ enum class AdmissionOutcome {
 ///  b) Mem Admitted: the amount of memory required (i.e. the value used in admission,
 ///     either the mem limit or estimate) for the requests that this impalad's admission
 ///     controller has admitted. Both the per-pool and per-host accounting is updated
-///     when requests are admitted and released (and note: not via the statestore, so
+///     when requests are admitted and released (and NOTE: not via the statestore, so
 ///     there is no latency, but this does not account for memory from requests admitted
 ///     by other impalads).
 ///
@@ -132,42 +144,46 @@ enum class AdmissionOutcome {
 ///
 /// Example:
 /// Consider a 10-node cluster with 100gb/node and a resource pool 'q1' configured with
-/// 500gb of memory. An incoming request with a 40gb MEM_LIMIT and schedule to execute on
-/// all backends is received by AdmitQuery() on an otherwise quiet cluster.
-/// CanAdmitRequest() checks the number of running queries and then calls
+/// 500gb of aggregate memory and 40gb as the max memory limit. An incoming request with
+/// the MEM_LIMIT query option set to 50gb and scheduled to execute on all backends is
+/// received by AdmitQuery() on an otherwise quiet cluster. Based on the pool
+/// configuration, a per host mem limit of 40gb is used for this query and for any
+/// subsequent checks that it needs to pass prior admission. CanAdmitRequest() checks for
+/// a valid pool config and the number of running queries and then calls
 /// HasAvailableMemResources() to check for memory resources. It first checks whether
 /// there is enough memory for the request using PoolStats::EffectiveMemReserved() (which
 /// is the max of the pool's agg_mem_reserved_ and local_mem_admitted_, see #1 above),
-/// and then checks for enough memory on each individual host via the max of the values
-/// in the host_mem_reserved_ and host_mem_admitted_ maps (see #2 above). In this case,
-/// ample resources are available so CanAdmitRequest() returns true.  PoolStats::Admit()
-/// is called to update q1's PoolStats: it first updates agg_num_running_ and
-/// local_mem_admitted_ which are able to be used immediately for incoming admission
-/// requests, then it updates num_admitted_running in the struct sent to the statestore
-/// (local_stats_). UpdateHostMemAdmitted() is called to update the per-host admitted mem
-/// (stored in the map host_mem_admitted_) for all participating hosts. Then AdmitQuery()
-/// returns to the Scheduler. If another identical admission request is received by the
-/// same coordinator immediately, it will be rejected because q1's local_mem_admitted_ is
-/// already 400gb. If that request were sent to another impalad at the same time, it
-/// would have been admitted because not all updates have been disseminated yet. The next
-/// statestore update will contain the updated value of num_admitted_running for q1 on
-/// this backend. As remote fragments begin execution on remote impalads, their pool mem
-/// trackers will reflect the updated amount of memory reserved (set in
-/// local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the next statestore
-/// updates coming from those impalads will send the updated value.  As the statestore
-/// updates are received (in the subscriber callback fn UpdatePoolStats()), the incoming
-/// per-backend, per-pool mem_reserved values are aggregated to
-/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and
-/// backend_mem_reserved_ (per-host aggregates over all pools). Once this has happened,
-/// any incoming admission request now has the updated state required to make correct
-/// admission decisions.
+/// then checks for enough memory on each individual host via the max of the values in the
+/// host_mem_reserved_ and host_mem_admitted_ maps (see #2 above) and finally checks if
+/// the memory limit used for this query can accommodate its largest initial reservation.
+/// In this case, ample resources are available so CanAdmitRequest() returns true.
+/// PoolStats::Admit() is called to update q1's PoolStats: it first updates
+/// agg_num_running_ and local_mem_admitted_ which are able to be used immediately for
+/// incoming admission requests, then it updates num_admitted_running in the struct sent
+/// to the statestore (local_stats_). UpdateHostMemAdmitted() is called to update the
+/// per-host admitted mem (stored in the map host_mem_admitted_) for all participating
+/// hosts. Then AdmitQuery() returns to the Scheduler. If another identical admission
+/// request is received by the same coordinator immediately, it will be rejected because
+/// q1's local_mem_admitted_ is already 400gb. If that request were sent to another
+/// impalad at the same time, it would have been admitted because not all updates have
+/// been disseminated yet. The next statestore update will contain the updated value of
+/// num_admitted_running for q1 on this backend. As remote fragments begin execution on
+/// remote impalads, their pool mem trackers will reflect the updated amount of memory
+/// reserved (set in local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the
+/// next statestore updates coming from those impalads will send the updated value. As
+/// the statestore updates are received (in the subscriber callback fn UpdatePoolStats()),
+/// the incoming per-backend, per-pool mem_reserved values are aggregated to
+/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and backend_mem_reserved_
+/// (per-host aggregates over all pools). Once this has happened, any incoming admission
+/// request now has the updated state required to make correct admission decisions.
 ///
 /// Queuing Behavior:
 /// Once the resources in a pool are consumed each coordinator receiving requests will
 /// begin queuing. While each individual queue is FIFO, there is no total ordering on the
 /// queued requests between admission controllers and no FIFO behavior is guaranteed for
 /// requests submitted to different coordinators. When resources become available, there
-/// is no synchronous coordination between nodes used to determine which get to dequeue and
+/// is no synchronous coordination between nodes used to determine which get to dequeue
+/// and
 /// admit requests. Instead, we use a simple heuristic to try to dequeue a number of
 /// requests proportional to the number of requests that are waiting in each individual
 /// admission controller to the total number of requests queued across all admission
@@ -186,10 +202,16 @@ enum class AdmissionOutcome {
 /// proactively cancelled by setting the 'admit_outcome' to AdmissionOutcome::CANCELLED.
 /// This is handled asynchronously by AdmitQuery() and DequeueLoop().
 ///
-/// TODO: Improve the dequeuing policy. IMPALA-2968.
+/// Pool Configuration Mechanism:
+/// The path to pool config files are specified using the startup flags
+/// "fair_scheduler_allocation_path" and "llama_site_path". The format for specifying pool
+/// configs is based on yarn and llama with additions specific to Impala. A file
+/// monitoring service is started that monitors changes made to these files. Those changes
+/// are only propagated to Impala when a new query is serviced. See RequestPoolService
+/// class for more details.
 ///
-/// TODO: Remove less important debug logging after more cluster testing. Should have a
-///       better idea of what is perhaps unnecessary.
+/// TODO: Improve the dequeuing policy. IMPALA-2968.
+
 class AdmissionController {
  public:
   AdmissionController(StatestoreSubscriber* subscriber,
@@ -206,7 +228,7 @@ class AdmissionController {
   /// - Cancelled: <CANCELLED, Status::CANCELLED>
   /// If admitted, ReleaseQuery() should also be called after the query completes or gets
   /// cancelled to ensure that the pool statistics are updated.
-  Status AdmitQuery(QuerySchedule* schedule,
+  Status SubmitForAdmission(QuerySchedule* schedule,
       Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome);
 
   /// Updates the pool statistics when a query completes (either successfully,
@@ -246,10 +268,16 @@ class AdmissionController {
   boost::mutex admission_ctrl_lock_;
 
   /// Maps from host id to memory reserved and memory admitted, both aggregates over all
-  /// pools. See the class doc for a definition of reserved and admitted. Protected by
-  /// admission_ctrl_lock_.
+  /// pools. See the class doc for a detailed definition of reserved and admitted.
+  /// Protected by admission_ctrl_lock_.
   typedef boost::unordered_map<std::string, int64_t> HostMemMap;
+  /// The mem reserved for a query that is currently executing is its memory limit, if set
+  /// (which should be the common case with admission control). Otherwise, if the query
+  /// has no limit or the query is finished executing, the current consumption (tracked
+  /// by its query mem tracker) is used.
   HostMemMap host_mem_reserved_;
+
+  /// The per host mem admitted only for the queries admitted locally.
   HostMemMap host_mem_admitted_;
 
   /// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
@@ -284,6 +312,9 @@ class AdmissionController {
       IntGauge* pool_max_mem_resources;
       IntGauge* pool_max_requests;
       IntGauge* pool_max_queued;
+      IntGauge* max_query_mem_limit;
+      IntGauge* min_query_mem_limit;
+      BooleanProperty* clamp_mem_limit_query_option;
     };
 
     PoolStats(AdmissionController* parent, const std::string& name)
@@ -390,13 +421,13 @@ class AdmissionController {
   /// during the call to AdmitQuery() but its members live past that and are owned by the
   /// ClientRequestState object associated with them.
   struct QueueNode : public InternalQueue<QueueNode>::Node {
-    QueueNode(const QuerySchedule& query_schedule,
+    QueueNode(QuerySchedule* query_schedule,
         Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
         RuntimeProfile* profile)
       : schedule(query_schedule), admit_outcome(admission_outcome), profile(profile) {}
 
     /// The query schedule of the queued request.
-    const QuerySchedule& schedule;
+    QuerySchedule* const schedule;
 
     /// The Admission outcome of the queued request.
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
@@ -461,6 +492,19 @@ class AdmissionController {
   bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       bool admit_from_queue, std::string* not_admitted_reason);
 
+  /// Returns true if the per host mem limit for the query represented by 'schedule' is
+  /// large enough to accommodate the largest initial reservation required. Otherwise,
+  /// returns false with the details about the memory shortage in
+  /// 'mem_unavailable_reason'. Possible cases where it can return false are:
+  /// 1. The pool.max_query_mem_limit is set too low
+  /// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
+  ///    the pool configuration.
+  /// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
+  /// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
+  ///    to a higher value but pool.clamp_mem_limit_query_option is false.
+  bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
+      const TPoolConfig& pool_cfg, string* mem_unavailable_reason);
+
   /// Returns true if there is enough memory available to admit the query based on the
   /// schedule, the aggregate pool memory, and the per-host memory. If not, this returns
   /// false and returns the reason in mem_unavailable_reason. Caller owns
@@ -485,6 +529,15 @@ class AdmissionController {
   /// Log the reason for dequeueing of 'node' failing and add the reason to the query's
   /// profile. Must hold admission_ctrl_lock_.
   void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);
+
+  /// Returns false if pool config is invalid and populates the 'reason' with the reason
+  /// behind invalidity.
+  bool IsPoolConfigValid(const TPoolConfig& pool_cfg, std::string* reason);
+
+  // Sets the per host mem limit and mem admitted in the schedule and does the necessary
+  // accounting and logging on successful submission.
+  // Caller must hold 'admission_ctrl_lock_'.
+  void AdmitQuery(QuerySchedule* schedule, bool was_queued);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 2cb208e..365c981 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -22,6 +22,7 @@
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "runtime/bufferpool/reservation-util.h"
 #include "util/container-util.h"
 #include "util/mem-info.h"
 #include "util/network-util.h"
@@ -165,37 +166,9 @@ void QuerySchedule::Validate() const {
   // TODO: add validation for BackendExecParams
 }
 
-int64_t QuerySchedule::GetClusterMemoryEstimate() const {
-  DCHECK_GT(per_backend_exec_params_.size(), 0);
-  const int64_t total_cluster_mem =
-      GetPerHostMemoryEstimate() * per_backend_exec_params_.size();
-  DCHECK_GE(total_cluster_mem, 0); // Assume total cluster memory fits in an int64_t.
-  return total_cluster_mem;
-}
-
 int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
-  // Precedence of different estimate sources is:
-  // user-supplied RM query option >
-  //     query option limit >
-  //       estimate >
-  //         server-side defaults
-  int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
-  bool has_query_option = false;
-  if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) {
-    query_option_memory_limit = query_options_.mem_limit;
-    has_query_option = true;
-  }
-
-  int64_t per_host_mem = 0L;
-  if (has_query_option) {
-    per_host_mem = query_option_memory_limit;
-  } else {
-    DCHECK(request_.__isset.per_host_mem_estimate);
-    per_host_mem = request_.per_host_mem_estimate;
-  }
-  // Cap the memory estimate at the amount of physical memory available. The user's
-  // provided value or the estimate from planning can each be unreasonable.
-  return min(per_host_mem, MemInfo::physical_mem());
+  DCHECK(request_.__isset.per_host_mem_estimate);
+  return request_.per_host_mem_estimate;
 }
 
 TUniqueId QuerySchedule::GetNextInstanceId() {
@@ -250,4 +223,56 @@ int QuerySchedule::GetNumFragmentInstances() const {
   return total;
 }
 
+int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
+  return per_backend_mem_to_admit() *  per_backend_exec_params_.size();
+}
+
+void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
+  // If the min_query_mem_limit and max_query_mem_limit are not set in the pool config
+  // then it falls back to traditional(old) behavior, which means that, if for_admission
+  // is false, it returns the mem_limit if it is set in the query options, else returns -1
+  // which means no limit; if for_admission is true, it returns the mem_limit if it is set
+  // in the query options, else returns the per host mem estimate calculated during
+  // planning.
+  bool mimic_old_behaviour =
+      pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
+
+  per_backend_mem_to_admit_ = 0;
+  bool has_query_option = false;
+  if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
+    per_backend_mem_to_admit_ = query_options().mem_limit;
+    has_query_option = true;
+  }
+
+  if (!has_query_option) {
+    per_backend_mem_to_admit_ = GetPerHostMemoryEstimate();
+    if (!mimic_old_behaviour) {
+      int64_t min_mem_limit_required = ReservationUtil::GetMinMemLimitFromReservation(
+          largest_min_reservation());
+      per_backend_mem_to_admit_ = max(per_backend_mem_to_admit_, min_mem_limit_required);
+    }
+  }
+
+  if (!has_query_option || pool_cfg.clamp_mem_limit_query_option) {
+    if (pool_cfg.min_query_mem_limit > 0) {
+      per_backend_mem_to_admit_ =
+          max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+    }
+    if (pool_cfg.max_query_mem_limit > 0) {
+      per_backend_mem_to_admit_ =
+          min(per_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
+    }
+  }
+
+  // Cap the memory estimate at the amount of physical memory available. The user's
+  // provided value or the estimate from planning can each be unreasonable.
+  per_backend_mem_to_admit_ = min(per_backend_mem_to_admit_, MemInfo::physical_mem());
+
+  if (mimic_old_behaviour && !has_query_option) {
+    per_backend_mem_limit_ = -1;
+  } else {
+    per_backend_mem_limit_ = per_backend_mem_to_admit_;
+  }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index b43cc7b..190caee 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -137,8 +137,15 @@ struct FragmentExecParams {
 /// and the granted resource reservation.
 ///
 /// QuerySchedule is a container class for scheduling data, but it doesn't contain
-/// scheduling logic itself. Its state either comes from the static TQueryExecRequest
-/// or is computed by Scheduler.
+/// scheduling logic itself.
+/// The general usage pattern is that part of its state gets set from the static
+/// TQueryExecRequest during initialization, then the actual schedule gets set by the
+/// scheduler, then finally it is passed to the admission controller that keeps updating
+/// the memory requirements by calling UpdateMemoryRequirements() every time it tries to
+/// admit the query but only sets the final values once the query gets admitted
+/// successfully. Note: Due to this usage pattern the memory requirement values should not
+/// be accessed by other clients of this class while the query is in admission control
+/// phase.
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
@@ -157,13 +164,8 @@ class QuerySchedule {
   // Valid after Schedule() succeeds.
   const std::string& request_pool() const { return request().query_ctx.request_pool; }
 
-  /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate
-  /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available,
-  /// but is capped at the amount of physical memory to avoid problems if either estimate
-  /// is unreasonably large.
+  /// Returns the estimated memory (bytes) per-node from planning.
   int64_t GetPerHostMemoryEstimate() const;
-  /// Total estimated memory for all nodes. set_num_hosts() must be set before calling.
-  int64_t GetClusterMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -221,10 +223,35 @@ class QuerySchedule {
   RuntimeProfile* summary_profile() { return summary_profile_; }
   RuntimeProfile::EventSequence* query_events() { return query_events_; }
 
+  int64_t largest_min_reservation() const { return largest_min_reservation_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t per_backend_mem_limit() const { return per_backend_mem_limit_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t per_backend_mem_to_admit() const {
+    DCHECK_GT(per_backend_mem_to_admit_, 0);
+    return per_backend_mem_to_admit_;
+  }
+
   void set_per_backend_exec_params(const PerBackendExecParams& params) {
     per_backend_exec_params_ = params;
   }
 
+  void set_largest_min_reservation(const int64_t largest_min_reservation) {
+    largest_min_reservation_ = largest_min_reservation;
+  }
+
+  /// Returns the Cluster wide memory admitted by the admission controller.
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t GetClusterMemoryToAdmit() const;
+
+  /// Populates or updates the per host query memory limit and the amount of memory to be
+  /// admitted based on the pool configuration passed to it. Must be called at least once
+  /// before making any calls to per_backend_mem_to_admit(), per_backend_mem_limit() and
+  /// GetClusterMemoryToAdmit().
+  void UpdateMemoryRequirements(const TPoolConfig& pool_cfg);
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -259,6 +286,20 @@ class QuerySchedule {
   /// Used to generate consecutive fragment instance ids.
   TUniqueId next_instance_id_;
 
+  /// The largest min memory reservation across all backends. Set in
+  /// Scheduler::Schedule().
+  int64_t largest_min_reservation_ = 0;
+
+  /// The memory limit per backend that will be imposed on the query.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. -1 means no limit.
+  int64_t per_backend_mem_limit_ = 0;
+
+  /// The per backend memory used for admission accounting.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully.
+  int64_t per_backend_mem_to_admit_ = 0;
+
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/request-pool-service.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index bad5ac1..f9fdf88 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -195,6 +195,9 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
         FLAGS_disable_pool_mem_limits ? -1 : default_pool_mem_limit_);
     pool_config->__set_max_queued(FLAGS_default_pool_max_queued);
     pool_config->__set_default_query_options("");
+    pool_config->__set_min_query_mem_limit(0);
+    pool_config->__set_max_query_mem_limit(0);
+    pool_config->__set_clamp_mem_limit_query_option(true);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 43bf199..2d51d5c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -787,12 +787,16 @@ void Scheduler::ComputeBackendExecParams(
     }
   }
 
+  int64_t largest_min_reservation = 0;
   for (auto& backend: per_backend_params) {
     const TNetworkAddress& host = backend.first;
     backend.second.proc_mem_limit =
         LookUpBackendDesc(executor_config, host).proc_mem_limit;
+    largest_min_reservation =
+        max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
   }
   schedule->set_per_backend_exec_params(per_backend_params);
+  schedule->set_largest_min_reservation(largest_min_reservation);
 
   stringstream min_mem_reservation_ss;
   for (const auto& e: per_backend_params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c19c974..c71429a 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -506,8 +506,9 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
   DebugActionNoFail(schedule_->query_options(), "CRS_BEFORE_ADMISSION");
 
   DCHECK(exec_env_->admission_controller() != nullptr);
-  Status admit_status = ExecEnv::GetInstance()->admission_controller()->AdmitQuery(
-      schedule_.get(), &admit_outcome_);
+  Status admit_status =
+      ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
+          schedule_.get(), &admit_outcome_);
   {
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(admit_status).ok()) return;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index cf8233d..7222dae 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -546,6 +546,10 @@ struct TExecQueryFInstancesParams {
   // operators in all fragment instances that execute on this backend. This is used for
   // an optimization in InitialReservation. Measured in bytes. required in V1
   7: optional i64 initial_mem_reservation_total_claims
+
+  // The backend memory limit (in bytes) as set by the admission controller. Used by the
+  // query mem tracker to enforce the memory limit. required in V1
+  8: optional i64 per_backend_mem_limit
 }
 
 struct TExecQueryFInstancesResult {
@@ -773,6 +777,20 @@ struct TPoolConfig {
 
   // Default query options that are applied to requests mapped to this pool.
   5: required string default_query_options;
+
+  // Maximum amount of memory that can be assigned to a query (in bytes).
+  // 0 indicates no limit. If both max_query_mem_limit and min_query_mem_limit are zero
+  // then the admission controller will fall back on old behavior, which is to not set
+  // any backend mem limit if mem_limit is not set in the query options.
+  6: required i64 max_query_mem_limit = 0;
+
+  // Minimum amount of memory that can be assigned to a query (in bytes).
+  // 0 indicates no limit.
+  7: required i64 min_query_mem_limit = 0;
+
+  // If false, the mem_limit query option will not be bounded by the max/min query mem
+  // limits specified for the pool. Default is true.
+  8: required bool clamp_mem_limit_query_option = true;
 }
 
 struct TBloomFilter {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index c40716a..72fab16 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -50,6 +50,36 @@
     "key": "admission-controller.pool-max-queued.$0"
   },
   {
+    "description": "Resource Pool $0 Max Query Memory Limit",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Max Query Memory Limit",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "admission-controller.pool-max-query-mem-limit.$0"
+  },
+  {
+    "description": "Resource Pool $0 Min Query Memory Limit",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Min Query Memory Limit",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "admission-controller.pool-min-query-mem-limit.$0"
+  },
+  {
+    "description": "If false, the mem_limit query option will not be bounded by the max/min query mem limits specified for the pool",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Clamp 'MEM_LIMIT' Query Option Flag",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "admission-controller.pool-clamp-mem-limit-query-option.$0"
+  },
+  {
     "description": "Resource Pool $0 Aggregate Queue Size",
     "contexts": [
       "RESOURCE_POOL"

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index a104399..af9fe7f 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -50,6 +50,7 @@ import org.apache.impala.util.FileWatchService.FileChangeListener;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -114,6 +115,19 @@ public class RequestPoolService {
   // use this.
   final static String QUERY_OPTIONS_KEY = "impala.admission-control.pool-default-query-options";
 
+  // Keys for the pool max and min query mem limits (in bytes) respectively. This is
+  // specified in the llama-site.xml but is Impala-specific and Llama does not use this.
+  final static String MAX_QUERY_MEM_LIMIT_BYTES =
+      "impala.admission-control.max-query-mem-limit";
+  final static String MIN_QUERY_MEM_LIMIT_BYTES =
+      "impala.admission-control.min-query-mem-limit";
+
+  // Key for specifying if the mem_limit query option can override max/min mem limits
+  // of the pool. This is specified in the llama-site.xml but is Impala-specific and
+  // Llama does not use this.
+  final static String CLAMP_MEM_LIMIT_QUERY_OPTION =
+      "impala.admission-control.clamp-mem-limit-query-option";
+
   // String format for a per-pool configuration key. First parameter is the key for the
   // default, e.g. LLAMA_MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the
   // pool name.
@@ -369,11 +383,17 @@ public class RequestPoolService {
           LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT));
 
       // Only return positive values. Admission control has a default from gflags.
-      int queueTimeoutMs = getLlamaPoolConfigValue(currentLlamaConf, pool,
-          QUEUE_TIMEOUT_KEY, -1);
+      long queueTimeoutMs = getLlamaPoolConfigValue(currentLlamaConf, pool,
+          QUEUE_TIMEOUT_KEY, -1L);
       if (queueTimeoutMs > 0) result.setQueue_timeout_ms(queueTimeoutMs);
       result.setDefault_query_options(getLlamaPoolConfigValue(currentLlamaConf, pool,
           QUERY_OPTIONS_KEY, ""));
+      result.setMax_query_mem_limit(getLlamaPoolConfigValue(currentLlamaConf, pool,
+          MAX_QUERY_MEM_LIMIT_BYTES, 0L));
+      result.setMin_query_mem_limit(getLlamaPoolConfigValue(currentLlamaConf, pool,
+          MIN_QUERY_MEM_LIMIT_BYTES, 0L));
+      result.setClamp_mem_limit_query_option(getLlamaPoolConfigValue(currentLlamaConf,
+          pool, CLAMP_MEM_LIMIT_QUERY_OPTION, true));
     }
     if (LOG.isTraceEnabled()) {
       LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}, " +
@@ -392,10 +412,10 @@ public class RequestPoolService {
    * @param conf The Configuration to use, provided so the caller can ensure the same
    *        Configuration is used to look up multiple properties.
    */
-  private int getLlamaPoolConfigValue(Configuration conf, String pool, String key,
-      int defaultValue) {
-    return conf.getInt(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
-        conf.getInt(key, defaultValue));
+  private long getLlamaPoolConfigValue(Configuration conf, String pool, String key,
+      long defaultValue) {
+    return conf.getLong(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
+        conf.getLong(key, defaultValue));
   }
 
   /**
@@ -408,6 +428,15 @@ public class RequestPoolService {
   }
 
   /**
+   * Looks up the per-pool Boolean config from the llama Configuration. See above.
+   */
+  private boolean getLlamaPoolConfigValue(Configuration conf, String pool, String key,
+      boolean defaultValue) {
+    return conf.getBoolean(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
+        conf.getBoolean(key, defaultValue));
+  }
+
+  /**
    * Resolves the actual pool to use via the allocation placement policy. The policy may
    * change the requested pool.
    *

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index 622c7a9..a66d2dc 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -152,7 +152,7 @@ public class TestRequestPoolService {
   public void testPoolResolution() throws Exception {
     createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
     Assert.assertEquals("root.queueA", poolService_.assignToPool("root.queueA", "userA"));
-    Assert.assertNull(poolService_.assignToPool("queueC", "userA"));
+    Assert.assertNull(poolService_.assignToPool("queueD", "userA"));
   }
 
   @Test
@@ -204,6 +204,8 @@ public class TestRequestPoolService {
     checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE,
         10000L, "mem_limit=1024m,query_timeout_s=10");
     checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
+    checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
+        "mem_limit=1024m", 1000, 10, false);
   }
 
   @Test
@@ -211,7 +213,7 @@ public class TestRequestPoolService {
     createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
     Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
     Assert.assertTrue(poolService_.hasAccess("root.userA", "userA"));
-    checkPoolConfigResult("root", -1, 200, -1);
+    checkPoolConfigResult("root", -1, 200, -1, null, "", 0 ,0, true);
   }
 
   @Ignore("IMPALA-4868") @Test
@@ -306,11 +308,15 @@ public class TestRequestPoolService {
    */
   private void checkPoolConfigResult(String pool, long expectedMaxRequests,
       long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
-      String expectedQueryOptions) {
+      String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit,
+      boolean clamp_mem_limit_query_option) {
     TPoolConfig expectedResult = new TPoolConfig();
     expectedResult.setMax_requests(expectedMaxRequests);
     expectedResult.setMax_queued(expectedMaxQueued);
     expectedResult.setMax_mem_resources(expectedMaxMem);
+    expectedResult.setMax_query_mem_limit(max_query_mem_limit);
+    expectedResult.setMin_query_mem_limit(min_query_mem_limit);
+    expectedResult.setClamp_mem_limit_query_option(clamp_mem_limit_query_option);
     if (expectedQueueTimeoutMs != null) {
       expectedResult.setQueue_timeout_ms(expectedQueueTimeoutMs);
     }
@@ -322,6 +328,13 @@ public class TestRequestPoolService {
   }
 
   private void checkPoolConfigResult(String pool, long expectedMaxRequests,
+      long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
+      String expectedQueryOptions) {
+    checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
+        expectedMaxMem, expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true);
+  }
+
+  private void checkPoolConfigResult(String pool, long expectedMaxRequests,
       long expectedMaxQueued, long expectedMaxMemUsage) {
     checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
         expectedMaxMemUsage, null, "");

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/fair-scheduler-test.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/fair-scheduler-test.xml b/fe/src/test/resources/fair-scheduler-test.xml
index f8536bf..9d3dafd 100644
--- a/fe/src/test/resources/fair-scheduler-test.xml
+++ b/fe/src/test/resources/fair-scheduler-test.xml
@@ -8,6 +8,10 @@
     <queue name="queueB">
       <aclSubmitApps>userB root</aclSubmitApps>
     </queue>
+    <queue name="queueC">
+      <aclSubmitApps>* </aclSubmitApps>
+      <maxResources>1024 mb, 0 vcores</maxResources>
+    </queue>
     <aclSubmitApps> </aclSubmitApps>
   </queue>
   <queuePlacementPolicy>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/llama-site-test.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/llama-site-test.xml b/fe/src/test/resources/llama-site-test.xml
index e66da83..3738705 100644
--- a/fe/src/test/resources/llama-site-test.xml
+++ b/fe/src/test/resources/llama-site-test.xml
@@ -43,4 +43,16 @@
     <name>impala.admission-control.pool-default-query-options.root.queueA</name>
     <value>mem_limit=1024m,query_timeout_s=10</value>
   </property>
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.queueC</name>
+    <value>1000</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.queueC</name>
+    <value>10</value>
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.queueC</name>
+    <value>false</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/mem-limit-test-fair-scheduler.xml b/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
new file mode 100644
index 0000000..3471642
--- /dev/null
+++ b/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<allocations>
+  <queue name="root">
+    <queue name="regularPoolWithoutClamping">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolLowMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolLowMaxLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="regularPool">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="regularPoolNoMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolNoMemLimits">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="maxLessThanMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="maxMemLessThanMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="invalidTestPool">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <aclSubmitApps> </aclSubmitApps>
+  </queue>
+  <queuePlacementPolicy>
+    <rule name="specified"/>
+    <rule name="default" queue="root.regularPool"/>
+  </queuePlacementPolicy>
+</allocations>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/mem-limit-test-llama-site.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/mem-limit-test-llama-site.xml b/fe/src/test/resources/mem-limit-test-llama-site.xml
new file mode 100644
index 0000000..2c48388
--- /dev/null
+++ b/fe/src/test/resources/mem-limit-test-llama-site.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <!--regularPoolWithoutClamping pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPoolWithoutClamping</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPoolWithoutClamping</name>
+    <value>104857600</value><!--100MB-->
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPoolWithoutClamping</name>
+    <value>false</value>
+  </property>
+  <!--poolLowMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.poolLowMinLimit</name>
+    <value>26214400</value><!--25MB-->
+  </property>
+  <!--poolLowMaxLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.poolLowMaxLimit</name>
+    <value>26214400</value><!--25MB-->
+  </property>
+  <!--regularPool pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPool</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPool</name>
+    <value>52428800</value><!--50MB-->
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPool</name>
+    <value>true</value>
+  </property>
+  <!--regularPoolNoMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPoolNoMinLimit</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPoolNoMinLimit</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPoolNoMinLimit</name>
+    <value>true</value>
+  </property>
+  <!--poolNoMemLimits pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.poolNoMemLimits</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.poolNoMemLimits</name>
+    <value>0</value>
+  </property>
+  <!--maxLessThanMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.maxLessThanMinLimit</name>
+    <value>100000</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.maxLessThanMinLimit</name>
+    <value>100001</value>
+  </property>
+  <!--maxMemLessThanMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.maxMemLessThanMinLimit</name>
+    <value>2621440001</value><!--2500MB + 1B-->
+  </property>
+  <!--invalidTestPool pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.invalidTestPool</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.invalidTestPool</name>
+    <value>26214400</value>
+  </property>
+  <property>
+    <name>llama.am.throttling.maximum.placed.reservations.root.invalidTestPool</name>
+    <value>1</value>
+  </property>
+</configuration>