You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2023/03/13 08:08:50 UTC

[impala] branch master updated: IMPALA-11858: Cap per backend memory estimate to its memory limit for admission

This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c810c51fa IMPALA-11858: Cap per backend memory estimate to its memory limit for admission
c810c51fa is described below

commit c810c51fa70e2718df76f91c861ebe3ff07da428
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Thu Feb 9 13:21:06 2023 -0800

    IMPALA-11858: Cap per backend memory estimate to its memory limit for admission
    
    Admission controller caps memory estimates for a given query to
    its physical memory. The memory estimates should instead be capped to
    the backend's memory limit for admission, which is computed during
    daemon initialization in ExecEnv::Init().
    
    With this patch, for a given query schedule, the Coordinator backend's
    memory limit is used for capping memory to admit on coordinator and min
    of all executor backend's memory limit is used for capping mem to admit
    on executors. A config option 'clamp_query_mem_limit_backend_mem_limit'
    is also added to revert to the old behavior where queries requesting
    more memory than backend's admission limit get rejected.
    
    The memory requested by a query when MEM_LIMIT or MEM_LIMIT_EXECUTORS is
    set is also capped to the memory limit for admission on the backends.
    
    Also fixed the issue related to excessive logging in query profiles
    when using global admission controller. If the query was queued
    the remote admission controller client was logging 'Queued' status in
    profile every time it checked the query status and it hadn't changed.
    
    Testing:
    - Updated existing unit tests in admission-controller-test.cc
    - Added new checks in existing tests in executor-group-test.cc
    - Updated custom_cluster tests in test_admission_controller.py
    - Ran exhaustive tests
    
    Change-Id: I3b1f6e530785ef832dbc831d7cc6793133f3335c
    Reviewed-on: http://gerrit.cloudera.org:8080/19533
    Reviewed-by: Abhishek Rawat <ar...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller-test.cc     | 91 ++++++++++++++++++----
 be/src/scheduling/admission-controller.cc          | 82 ++++++++++++-------
 be/src/scheduling/cluster-membership-test-util.cc  | 13 ++--
 be/src/scheduling/cluster-membership-test-util.h   | 12 ++-
 be/src/scheduling/executor-group-test.cc           | 49 +++++++++---
 be/src/scheduling/executor-group.cc                | 31 +++++++-
 be/src/scheduling/executor-group.h                 | 12 +++
 .../scheduling/remote-admission-control-client.cc  |  7 +-
 be/src/scheduling/schedule-state.cc                | 38 +++++----
 be/src/scheduling/schedule-state.h                 |  7 +-
 be/src/scheduling/scheduler-test-util.cc           |  1 +
 tests/custom_cluster/test_admission_controller.py  | 48 +++++++++---
 12 files changed, 299 insertions(+), 92 deletions(-)

diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 85e2073e4..f2d01ba15 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -24,6 +24,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/test-env.h"
+#include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/schedule-state.h"
 #include "service/impala-server.h"
 #include "testutil/gtest-util.h"
@@ -33,8 +34,10 @@
 // Access the flags that are defined in RequestPoolService.
 DECLARE_string(fair_scheduler_allocation_path);
 DECLARE_string(llama_site_path);
+DECLARE_bool(clamp_query_mem_limit_backend_mem_limit);
 
 namespace impala {
+using namespace impala::test;
 
 static const string IMPALA_HOME(getenv("IMPALA_HOME"));
 
@@ -50,9 +53,6 @@ static const string HOST_0 = "host0:25000";
 static const string HOST_1 = "host1:25000";
 static const string HOST_2 = "host2:25000";
 
-static const int64_t MEGABYTE = 1024L * 1024L;
-static const int64_t GIGABYTE = 1024L * MEGABYTE;
-
 // The default version of the heavy memory query list.
 static std::vector<THeavyMemoryQuery> empty_heavy_memory_query_list;
 
@@ -63,6 +63,8 @@ static std::vector<THeavyMemoryQuery> empty_heavy_memory_query_list;
 /// taking the admission_ctrl_lock_ lock.
 class AdmissionControllerTest : public testing::Test {
  protected:
+  typedef std::pair<ExecutorGroup, BackendDescriptorPB> ExecutorGroupCoordinatorPair;
+
   boost::scoped_ptr<TestEnv> test_env_;
 
   // Pool for objects to be destroyed during test teardown.
@@ -104,7 +106,10 @@ class AdmissionControllerTest : public testing::Test {
         pool_.Add(new ScheduleState(*query_id, *request, *query_options, profile, true));
     schedule_state->set_executor_group(executor_group);
     SetHostsInScheduleState(*schedule_state, num_hosts, is_dedicated_coord);
-    schedule_state->UpdateMemoryRequirements(config);
+    ExecutorGroupCoordinatorPair group = MakeExecutorConfig(*schedule_state);
+    schedule_state->UpdateMemoryRequirements(config,
+        group.second.admit_mem_limit(),
+        group.first.GetPerExecutorMemLimitForAdmission());
     return schedule_state;
   }
 
@@ -116,6 +121,27 @@ class AdmissionControllerTest : public testing::Test {
         per_host_mem_estimate, per_host_mem_estimate, false, executor_group);
   }
 
+  /// Create ExecutorGroup and BackendDescriptor for Coordinator for the given
+  /// ScheduleState.
+  ExecutorGroupCoordinatorPair MakeExecutorConfig(const ScheduleState& schedule_state) {
+    BackendDescriptorPB coord_desc;
+    ExecutorGroup exec_group(schedule_state.executor_group());
+    const PerBackendScheduleStates& per_backend_schedule_states =
+        schedule_state.per_backend_schedule_states();
+    bool has_coord_backend = false;
+    for (const auto& itr : per_backend_schedule_states) {
+      if (itr.second.exec_params->is_coord_backend()) {
+        coord_desc = itr.second.be_desc;
+        has_coord_backend = true;
+      }
+      if (itr.second.be_desc.is_executor()) {
+        exec_group.AddExecutor(itr.second.be_desc);
+      }
+    }
+    DCHECK(has_coord_backend) << "Query schedule must have a coordinator backend";
+    return std::make_pair(exec_group, coord_desc);
+  }
+
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
   /// Note: no FInstanceExecParams are added so
   /// ScheduleState::UseDedicatedCoordEstimates() would consider this schedule as not
@@ -123,7 +149,7 @@ class AdmissionControllerTest : public testing::Test {
   /// if a dedicated coordinator backend exists.
   void SetHostsInScheduleState(ScheduleState& schedule_state, const int count,
       bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
-      int64_t admit_mem_limit = 200L * MEGABYTE, int slots_to_use = 1,
+      int64_t admit_mem_limit = 4096L * MEGABYTE, int slots_to_use = 1,
       int slots_available = 8) {
     schedule_state.ClearBackendScheduleStates();
     for (int i = 0; i < count; ++i) {
@@ -137,6 +163,7 @@ class AdmissionControllerTest : public testing::Test {
       backend_schedule_state.be_desc.set_admit_mem_limit(admit_mem_limit);
       backend_schedule_state.be_desc.set_admission_slots(slots_available);
       backend_schedule_state.be_desc.set_is_executor(true);
+      backend_schedule_state.be_desc.set_ip_address(test::HostIdxToIpAddr(i));
       *backend_schedule_state.be_desc.mutable_address() = host_addr;
       if (i == 0) {
         // Add first element as the coordinator.
@@ -324,7 +351,10 @@ TEST_F(AdmissionControllerTest, Simple) {
 
   // Create a ScheduleState to run on QUEUE_C.
   ScheduleState* schedule_state = MakeScheduleState(QUEUE_C, config_c, 1, 64L * MEGABYTE);
-  schedule_state->UpdateMemoryRequirements(config_c);
+  ExecutorGroupCoordinatorPair group = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(config_c,
+      group.second.admit_mem_limit(),
+      group.first.GetPerExecutorMemLimitForAdmission());
 
   // Check that the AdmissionController initially has no data about other hosts.
   ASSERT_EQ(0, admission_controller->host_stats_.size());
@@ -779,7 +809,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
   // For query only running on the coordinator, the per_backend_mem_to_admit should be 0.
   ScheduleState* schedule_state = MakeScheduleState(
       "default", 0, pool_config, 1, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group.second.admit_mem_limit(),
+      group.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(0, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(COORD_MEM_ESTIMATE, schedule_state->coord_backend_mem_to_admit());
 
@@ -789,7 +822,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
       "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
   ASSERT_EQ(COORD_MEM_ESTIMATE, schedule_state->GetDedicatedCoordMemoryEstimate());
   ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, schedule_state->GetPerExecutorMemoryEstimate());
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group1 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group1.second.admit_mem_limit(),
+      group1.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(COORD_MEM_ESTIMATE, schedule_state->coord_backend_mem_to_admit());
   ASSERT_EQ(-1, schedule_state->per_backend_mem_limit());
@@ -803,7 +839,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
       "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
   ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
   pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group2 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group2.second.admit_mem_limit(),
+      group2.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(COORD_MEM_ESTIMATE, schedule_state->coord_backend_mem_to_admit());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_limit());
@@ -818,7 +857,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
       ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation);
   pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
   schedule_state->set_coord_min_reservation(200 * MEGABYTE);
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group3 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group3.second.admit_mem_limit(),
+      group3.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(min_coord_mem_limit_required, schedule_state->coord_backend_mem_to_admit());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_limit());
@@ -828,7 +870,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
   ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
   schedule_state = MakeScheduleState("default", GIGABYTE, pool_config, 2,
       PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group4 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group4.second.admit_mem_limit(),
+      group4.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(GIGABYTE, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(GIGABYTE, schedule_state->coord_backend_mem_to_admit());
   ASSERT_EQ(GIGABYTE, schedule_state->per_backend_mem_limit());
@@ -840,7 +885,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordScheduleState) {
       PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
   ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
   pool_config.__set_max_query_mem_limit(700 * MEGABYTE);
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group5 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group5.second.admit_mem_limit(),
+      group5.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_to_admit());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->coord_backend_mem_to_admit());
   ASSERT_EQ(700 * MEGABYTE, schedule_state->per_backend_mem_limit());
@@ -876,6 +924,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   coord_exec_params.be_desc.set_admission_slots(8);
   coord_exec_params.be_desc.set_is_executor(false);
   coord_exec_params.be_desc.set_is_coordinator(true);
+  coord_exec_params.be_desc.set_ip_address(test::HostIdxToIpAddr(1));
   // Add executor backend.
   const string exec_host_name = Substitute("host$0", 2);
   NetworkAddressPB exec_addr = MakeNetworkAddressPB(exec_host_name, 25000);
@@ -887,12 +936,16 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   backend_schedule_state.be_desc.set_admit_mem_limit(GIGABYTE);
   backend_schedule_state.be_desc.set_admission_slots(8);
   backend_schedule_state.be_desc.set_is_executor(true);
+  backend_schedule_state.be_desc.set_ip_address(test::HostIdxToIpAddr(2));
   string not_admitted_reason;
   bool coordinator_resource_limited = false;
   // Test 1: coord's admit_mem_limit < executor's admit_mem_limit. Query should not
   // be rejected because query fits on both executor and coordinator. It should be
   // queued if there is not enough capacity.
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group.second.admit_mem_limit(),
+      group.first.GetPerExecutorMemLimitForAdmission());
   ASSERT_FALSE(admission_controller->RejectForSchedule(
       *schedule_state, pool_config, &not_admitted_reason));
   ASSERT_TRUE(admission_controller->HasAvailableMemResources(
@@ -931,6 +984,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
   // coord's admit_mem_limit is less than mem_to_admit on the coord.
   // Re-using previous ScheduleState object.
+  FLAGS_clamp_query_mem_limit_backend_mem_limit = false;
   coord_exec_params.be_desc.set_admit_mem_limit(100 * MEGABYTE);
   ASSERT_TRUE(admission_controller->RejectForSchedule(
       *schedule_state, pool_config, &not_admitted_reason));
@@ -945,6 +999,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   ASSERT_TRUE(coordinator_resource_limited);
   coordinator_resource_limited = false;
   not_admitted_reason.clear();
+  FLAGS_clamp_query_mem_limit_backend_mem_limit = true;
 
   // Test 3: Check HasAvailableSlots by simulating slots being in use.
   ASSERT_TRUE(admission_controller->HasAvailableSlots(
@@ -977,7 +1032,10 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   schedule_state = MakeScheduleState(
       "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
   pool_config.__set_min_query_mem_limit(MEGABYTE); // to auto set mem_limit(s).
-  schedule_state->UpdateMemoryRequirements(pool_config);
+  ExecutorGroupCoordinatorPair group1 = MakeExecutorConfig(*schedule_state);
+  schedule_state->UpdateMemoryRequirements(pool_config,
+      group1.second.admit_mem_limit(),
+      group1.first.GetPerExecutorMemLimitForAdmission());
   schedule_state->set_largest_min_reservation(600 * MEGABYTE);
   schedule_state->set_coord_min_reservation(50 * MEGABYTE);
   ASSERT_TRUE(AdmissionController::CanAccommodateMaxInitialReservation(
@@ -1071,7 +1129,10 @@ TEST_F(AdmissionControllerTest, TopNQueryCheck) {
       long per_host_mem_estimate = (rand() % 10 + 1) * MEGABYTE;
       ScheduleState* schedule_state =
           MakeScheduleState(pool_name, pool_config, 1, per_host_mem_estimate);
-      schedule_state->UpdateMemoryRequirements(pool_config);
+      ExecutorGroupCoordinatorPair group = MakeExecutorConfig(*schedule_state);
+      schedule_state->UpdateMemoryRequirements(pool_config,
+          group.second.admit_mem_limit(),
+          group.first.GetPerExecutorMemLimitForAdmission());
       // Admit the query to the pool.
       string not_admitted_reason;
       bool coordinator_resource_limited = false;
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index f0f68e00c..5aa672062 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -60,6 +60,15 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
+DEFINE_bool(clamp_query_mem_limit_backend_mem_limit, true, "Caps query memory limit to "
+    "memory limit for admission on the backends. The coordinator memory limit is capped "
+    "to the coordinator backend's memory limit, while executor memory limit is capped to "
+    "the effective or minimum memory limit for admission on executor backends. If the "
+    "flag is not set, a query requesting more than backend's memory limit for admission "
+    "gets rejected during admission. However, if this flag is set, such a query gets "
+    "admitted with backend's memory limit and could succeed if the memory request was "
+    "over estimated and could fail if query really needs more memory." );
+
 DECLARE_bool(is_coordinator);
 DECLARE_bool(is_executor);
 
@@ -1095,12 +1104,15 @@ bool AdmissionController::RejectForSchedule(
       max_thread_reservation =
           make_pair(&e.first, be_state.exec_params->thread_reservation());
     }
-    if (be_state.exec_params->is_coord_backend()) {
-      coord_admit_mem_limit.first = &e.first;
-      coord_admit_mem_limit.second = be_state.be_desc.admit_mem_limit();
-    } else if (be_state.be_desc.admit_mem_limit() < min_executor_admit_mem_limit.second) {
-      min_executor_admit_mem_limit.first = &e.first;
-      min_executor_admit_mem_limit.second = be_state.be_desc.admit_mem_limit();
+    if (!FLAGS_clamp_query_mem_limit_backend_mem_limit) {
+      if (be_state.exec_params->is_coord_backend()) {
+        coord_admit_mem_limit.first = &e.first;
+        coord_admit_mem_limit.second = be_state.be_desc.admit_mem_limit();
+      } else if (be_state.be_desc.admit_mem_limit() <
+            min_executor_admit_mem_limit.second) {
+        min_executor_admit_mem_limit.first = &e.first;
+        min_executor_admit_mem_limit.second = be_state.be_desc.admit_mem_limit();
+      }
     }
   }
 
@@ -1156,27 +1168,29 @@ bool AdmissionController::RejectForSchedule(
           PrintBytes(cluster_mem_to_admit), PrintBytes(max_mem));
       return true;
     }
-    int64_t executor_mem_to_admit = state.per_backend_mem_to_admit();
-    VLOG_ROW << "Checking executor mem with executor_mem_to_admit = "
-             << executor_mem_to_admit
-             << " and min_admit_mem_limit.second = "
-             << min_executor_admit_mem_limit.second;
-    if (executor_mem_to_admit > min_executor_admit_mem_limit.second) {
-      *rejection_reason =
-          Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit),
-              PrintBytes(min_executor_admit_mem_limit.second),
-              NetworkAddressPBToString(*min_executor_admit_mem_limit.first));
-      return true;
-    }
-    int64_t coord_mem_to_admit = state.coord_backend_mem_to_admit();
-    VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
-             << coord_mem_to_admit
-             << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
-    if (coord_mem_to_admit > coord_admit_mem_limit.second) {
-      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
-          NetworkAddressPBToString(*coord_admit_mem_limit.first));
-      return true;
+    if (!FLAGS_clamp_query_mem_limit_backend_mem_limit) {
+      int64_t executor_mem_to_admit = state.per_backend_mem_to_admit();
+      VLOG_ROW << "Checking executor mem with executor_mem_to_admit = "
+               << executor_mem_to_admit
+               << " and min_admit_mem_limit.second = "
+               << min_executor_admit_mem_limit.second;
+      if (executor_mem_to_admit > min_executor_admit_mem_limit.second) {
+        *rejection_reason =
+            Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit),
+                PrintBytes(min_executor_admit_mem_limit.second),
+                NetworkAddressPBToString(*min_executor_admit_mem_limit.first));
+        return true;
+      }
+      int64_t coord_mem_to_admit = state.coord_backend_mem_to_admit();
+      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+               << coord_mem_to_admit
+               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+            NetworkAddressPBToString(*coord_admit_mem_limit.first));
+        return true;
+      }
     }
   }
   return false;
@@ -1899,10 +1913,22 @@ bool AdmissionController::FindGroupToAdmitOrReject(
     return true;
   }
 
+  // Get Coordinator Backend for the given admission request
+  const AdmissionRequest& request = queue_node->admission_request;
+  auto it = membership_snapshot->current_backends.find(PrintId(request.coord_id));
+  if (it == membership_snapshot->current_backends.end()) {
+    queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
+    LOG(WARNING) << queue_node->not_admitted_reason;
+    return true;
+  }
+  const BackendDescriptorPB& coord_desc = it->second;
+
   for (GroupScheduleState& group_state : queue_node->group_states) {
     const ExecutorGroup& executor_group = group_state.executor_group;
     ScheduleState* state = group_state.state.get();
-    state->UpdateMemoryRequirements(pool_config);
+    state->UpdateMemoryRequirements(pool_config,
+        coord_desc.admit_mem_limit(),
+        executor_group.GetPerExecutorMemLimitForAdmission());
 
     const string& group_name = executor_group.name();
     int64_t group_size = executor_group.NumExecutors();
diff --git a/be/src/scheduling/cluster-membership-test-util.cc b/be/src/scheduling/cluster-membership-test-util.cc
index ba9912f0a..36c9e1ad5 100644
--- a/be/src/scheduling/cluster-membership-test-util.cc
+++ b/be/src/scheduling/cluster-membership-test-util.cc
@@ -46,7 +46,8 @@ string HostIdxToIpAddr(int host_idx) {
 }
 
 BackendDescriptorPB MakeBackendDescriptor(
-    int idx, const ExecutorGroupDescPB& group_desc, int port_offset) {
+    int idx, const ExecutorGroupDescPB& group_desc, int port_offset,
+    int64_t admit_mem_limit) {
   BackendDescriptorPB be_desc;
   UUIDToUniqueIdPB(boost::uuids::random_generator()(), be_desc.mutable_backend_id());
   be_desc.mutable_address()->set_hostname(HostIdxToHostname(idx));
@@ -58,23 +59,25 @@ BackendDescriptorPB MakeBackendDescriptor(
   be_desc.set_is_coordinator(true);
   be_desc.set_is_executor(true);
   be_desc.set_is_quiescing(false);
+  be_desc.set_admit_mem_limit(admit_mem_limit);
   *be_desc.add_executor_groups() = group_desc;
   return be_desc;
 }
 
 BackendDescriptorPB MakeBackendDescriptor(
-    int idx, const ExecutorGroup& group, int port_offset) {
+    int idx, const ExecutorGroup& group, int port_offset, int64_t admit_mem_limit) {
   ExecutorGroupDescPB group_desc;
   group_desc.set_name(group.name());
   group_desc.set_min_size(group.min_size());
-  return MakeBackendDescriptor(idx, group_desc, port_offset);
+  return MakeBackendDescriptor(idx, group_desc, port_offset, admit_mem_limit);
 }
 
-BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset) {
+BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset,
+    int64_t admit_mem_limit) {
   ExecutorGroupDescPB group_desc;
   group_desc.set_name(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
   group_desc.set_min_size(1);
-  return MakeBackendDescriptor(idx, group_desc, port_offset);
+  return MakeBackendDescriptor(idx, group_desc, port_offset, admit_mem_limit);
 }
 
 }  // end namespace test
diff --git a/be/src/scheduling/cluster-membership-test-util.h b/be/src/scheduling/cluster-membership-test-util.h
index 77871784e..63a016bda 100644
--- a/be/src/scheduling/cluster-membership-test-util.h
+++ b/be/src/scheduling/cluster-membership-test-util.h
@@ -27,6 +27,9 @@ class ExecutorGroupDescPB;
 
 namespace test {
 
+static const int64_t MEGABYTE = 1024L * 1024L;
+static const int64_t GIGABYTE = 1024L * MEGABYTE;
+
 /// Convert a host index to a hostname.
 std::string HostIdxToHostname(int host_idx);
 
@@ -40,14 +43,17 @@ std::string HostIdxToIpAddr(int host_idx);
 ///
 /// Make a backend descriptor for group 'group_desc'.
 BackendDescriptorPB MakeBackendDescriptor(
-    int idx, const ExecutorGroupDescPB& group_desc, int port_offset = 0);
+    int idx, const ExecutorGroupDescPB& group_desc, int port_offset = 0,
+    int64_t admit_mem_limit = 4L * MEGABYTE);
 
 /// Make a backend descriptor for 'group'.
 BackendDescriptorPB MakeBackendDescriptor(
-    int idx, const ExecutorGroup& group, int port_offset = 0);
+    int idx, const ExecutorGroup& group, int port_offset = 0,
+    int64_t admit_mem_limit = 4L * MEGABYTE);
 
 /// Make a backend descriptor for the default executor group.
-BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset = 0);
+BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset = 0,
+    int64_t admit_mem_limit = 4L * MEGABYTE);
 
 }  // end namespace test
 }  // end namespace impala
diff --git a/be/src/scheduling/executor-group-test.cc b/be/src/scheduling/executor-group-test.cc
index b47d44fc5..96a744136 100644
--- a/be/src/scheduling/executor-group-test.cc
+++ b/be/src/scheduling/executor-group-test.cc
@@ -30,8 +30,14 @@ using namespace impala::test;
 /// Test adding multiple backends on different hosts.
 TEST(ExecutorGroupTest, AddExecutors) {
   ExecutorGroup group1("group1");
-  group1.AddExecutor(MakeBackendDescriptor(1, group1));
-  group1.AddExecutor(MakeBackendDescriptor(2, group1));
+  ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
+  int64_t mem_limit_admission1 = 100L * MEGABYTE;
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
+      mem_limit_admission1));
+  int64_t mem_limit_admission2 = 120L * MEGABYTE;
+  group1.AddExecutor(MakeBackendDescriptor(2, group1, /* port_offset=*/0,
+      mem_limit_admission2));
+  ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(2, group1.NumExecutors());
   IpAddr backend_ip;
   ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
@@ -43,8 +49,13 @@ TEST(ExecutorGroupTest, AddExecutors) {
 /// Test adding multiple backends on the same host.
 TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
   ExecutorGroup group1("group1");
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0));
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
+  int64_t mem_limit_admission1 = 120L * MEGABYTE;
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
+      mem_limit_admission1));
+  int64_t mem_limit_admission2 = 100L * MEGABYTE;
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1,
+      mem_limit_admission2));
+  ASSERT_EQ(mem_limit_admission2, group1.GetPerExecutorMemLimitForAdmission());
   IpAddr backend_ip;
   ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
@@ -55,9 +66,17 @@ TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
 /// Test removing a backend.
 TEST(ExecutorGroupTest, RemoveExecutor) {
   ExecutorGroup group1("group1");
-  group1.AddExecutor(MakeBackendDescriptor(1, group1));
-  group1.AddExecutor(MakeBackendDescriptor(2, group1));
-  group1.RemoveExecutor(MakeBackendDescriptor(2, group1));
+  int64_t mem_limit_admission1 = 120L * MEGABYTE;
+  const BackendDescriptorPB& executor1 = MakeBackendDescriptor(1, group1,
+      /* port_offset=*/0, mem_limit_admission1);
+  group1.AddExecutor(executor1);
+  int64_t mem_limit_admission2 = 100L * MEGABYTE;
+  const BackendDescriptorPB& executor2 = MakeBackendDescriptor(2, group1,
+      /* port_offset=*/0, mem_limit_admission2);
+  group1.AddExecutor(executor2);
+  ASSERT_EQ(mem_limit_admission2, group1.GetPerExecutorMemLimitForAdmission());
+  group1.RemoveExecutor(executor2);
+  ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
   IpAddr backend_ip;
   ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
@@ -67,14 +86,24 @@ TEST(ExecutorGroupTest, RemoveExecutor) {
 /// Test removing one of multiple backends on the same host (IMPALA-3944).
 TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) {
   ExecutorGroup group1("group1");
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0));
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
-  group1.RemoveExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
+  int64_t mem_limit_admission1 = 100L * MEGABYTE;
+  const BackendDescriptorPB& executor1 = MakeBackendDescriptor(1, group1,
+      /* port_offset=*/0, mem_limit_admission1);
+  group1.AddExecutor(executor1);
+  int64_t mem_limit_admission2 = 120L * MEGABYTE;
+  const BackendDescriptorPB& executor2 = MakeBackendDescriptor(1, group1,
+      /* port_offset=*/1, mem_limit_admission2);
+  group1.AddExecutor(executor2);
+  ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
+  group1.RemoveExecutor(executor2);
+  ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
   IpAddr backend_ip;
   ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
   const ExecutorGroup::Executors& backend_list = group1.GetExecutorsForHost("10.0.0.1");
   EXPECT_EQ(1, backend_list.size());
+  group1.RemoveExecutor(executor1);
+  ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
 }
 
 /// Test that exercises the size-based group health check.
diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc
index 3ac6bd68d..5b711dc37 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -29,7 +29,8 @@ static const uint32_t NUM_HASH_RING_REPLICAS = 25;
 ExecutorGroup::ExecutorGroup(string name) : ExecutorGroup(name, 1) {}
 
 ExecutorGroup::ExecutorGroup(string name, int64_t min_size)
-  : name_(name), min_size_(min_size), executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {
+  : name_(name), min_size_(min_size), executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS),
+    per_executor_admit_mem_limit_(0) {
   DCHECK_GT(min_size_, 0);
 }
 
@@ -96,6 +97,15 @@ void ExecutorGroup::AddExecutor(const BackendDescriptorPB& be_desc) {
   }
   be_descs.push_back(be_desc);
   executor_ip_map_[be_desc.address().hostname()] = be_desc.ip_address();
+
+  DCHECK(be_desc.admit_mem_limit() > 0) << "Admit memory limit must be set for backends";
+  if (per_executor_admit_mem_limit_ > 0) {
+    per_executor_admit_mem_limit_ =
+        std::min(be_desc.admit_mem_limit(), per_executor_admit_mem_limit_);
+  } else if (per_executor_admit_mem_limit_ == 0) {
+    per_executor_admit_mem_limit_ = be_desc.admit_mem_limit();
+  }
+
   if (be_desc.ip_address() == "127.0.0.1") {
     // Include localhost as an alias for filesystems that don't translate it.
     LOG(INFO) << "Adding executor localhost alias for "
@@ -125,6 +135,9 @@ void ExecutorGroup::RemoveExecutor(const BackendDescriptorPB& be_desc) {
     return;
   }
   be_descs.erase(remove_it);
+  if (per_executor_admit_mem_limit_ == be_desc.admit_mem_limit()) {
+    CalculatePerExecutorMemLimitForAdmission();
+  }
   if (be_descs.empty()) {
     executor_map_.erase(be_descs_it);
     executor_ip_map_.erase(be_desc.address().hostname());
@@ -193,4 +206,20 @@ bool ExecutorGroup::CheckConsistencyOrWarn(const BackendDescriptorPB& be_desc) c
   return true;
 }
 
+void ExecutorGroup::CalculatePerExecutorMemLimitForAdmission() {
+  per_executor_admit_mem_limit_ = numeric_limits<int64_t>::max();
+  int num_executors = 0;
+  for (const auto& executor_list: executor_map_) {
+    const Executors& be_descs = executor_list.second;
+    num_executors += be_descs.size();
+    for (const auto& be_desc: be_descs) {
+      per_executor_admit_mem_limit_ = std::min(per_executor_admit_mem_limit_,
+          be_desc.admit_mem_limit());
+    }
+  }
+  if (num_executors == 0) {
+    per_executor_admit_mem_limit_ = 0;
+  }
+}
+
 }  // end ns impala
diff --git a/be/src/scheduling/executor-group.h b/be/src/scheduling/executor-group.h
index a191f29ec..208120d09 100644
--- a/be/src/scheduling/executor-group.h
+++ b/be/src/scheduling/executor-group.h
@@ -112,6 +112,11 @@ class ExecutorGroup {
   /// Returns false otherwise.
   bool IsHealthy() const;
 
+  /// Return effective memory limit for admission for executors in this group.
+  int64_t GetPerExecutorMemLimitForAdmission() const {
+      return per_executor_admit_mem_limit_;
+  }
+
   const string& name() const { return name_; }
   int64_t min_size() const { return min_size_; }
 
@@ -121,6 +126,9 @@ class ExecutorGroup {
   /// found. Returns false and logs a warning otherwise.
   bool CheckConsistencyOrWarn(const BackendDescriptorPB& be_desc) const;
 
+  /// Calculates minimum memory limit for admission across all backends.
+  void CalculatePerExecutorMemLimitForAdmission();
+
   const std::string name_;
 
   /// The minimum number of executors in this group to be considered healthy. A group must
@@ -140,6 +148,10 @@ class ExecutorGroup {
   /// All executors are kept in a hash ring to allow a consistent mapping from filenames
   /// to executors.
   HashRing executor_ip_hash_ring_;
+
+  /// The minimum memory limit in bytes for admission across all backends. Used by
+  /// admission controller to bound the per backend memory limit for a given query.
+  int64_t per_executor_admit_mem_limit_;
 };
 
 }  // end ns impala
diff --git a/be/src/scheduling/remote-admission-control-client.cc b/be/src/scheduling/remote-admission-control-client.cc
index 7deba6311..9e877f6e4 100644
--- a/be/src/scheduling/remote-admission-control-client.cc
+++ b/be/src/scheduling/remote-admission-control-client.cc
@@ -144,6 +144,7 @@ Status RemoteAdmissionControlClient::SubmitForAdmission(
   KUDU_RETURN_IF_ERROR(admit_rpc_status, "AdmitQuery rpc failed");
   RETURN_IF_ERROR(admit_status);
 
+  bool is_query_queued = false;
   while (true) {
     RpcController rpc_controller2;
     GetQueryStatusRequestPB get_status_req;
@@ -169,7 +170,11 @@ Status RemoteAdmissionControlClient::SubmitForAdmission(
     if (!admit_status.ok()) {
       break;
     }
-    query_events->MarkEvent(QUERY_EVENT_QUEUED);
+
+    if (!is_query_queued) {
+      query_events->MarkEvent(QUERY_EVENT_QUEUED);
+      is_query_queued = true;
+    }
 
     SleepForMs(FLAGS_admission_status_retry_time_ms);
   }
diff --git a/be/src/scheduling/schedule-state.cc b/be/src/scheduling/schedule-state.cc
index e9243c2f3..aa69c91fd 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -25,6 +25,8 @@
 
 #include "common/names.h"
 
+DECLARE_bool(clamp_query_mem_limit_backend_mem_limit);
+
 DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
     "Hidden option to fall back to legacy memory estimation logic for dedicated"
     " coordinators wherein the same per backend estimate is used for both coordinators "
@@ -191,7 +193,7 @@ void ScheduleState::Validate() const {
     DCHECK_GT(fragment_state.instance_states.size(), 0) << fragment_state.fragment;
   }
 
-  // Check that all backends have instances, except possibly the coordaintor backend.
+  // Check that all backends have instances, except possibly the coordinator backend.
   for (const auto& elem : per_backend_schedule_states_) {
     const BackendExecParamsPB* be_params = elem.second.exec_params;
     DCHECK(!be_params->instance_params().empty() || be_params->is_coord_backend());
@@ -256,13 +258,14 @@ bool ScheduleState::UseDedicatedCoordEstimates() const {
   return false;
 }
 
-void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
+void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg,
+    int64_t coord_mem_limit_admission, int64_t executor_mem_limit_admission) {
   // If the min_query_mem_limit and max_query_mem_limit are not set in the pool config
   // then it falls back to traditional(old) behavior, which means that, it sets the
   // mem_limit if it is set in the query options, else sets it to -1 (no limit).
-  bool mimic_old_behaviour =
+  const bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
-  bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
+  const bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
 
   int64_t per_backend_mem_to_admit = 0;
   int64_t coord_backend_mem_to_admit = 0;
@@ -312,18 +315,28 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     }
   }
 
-  // Cap the memory estimate at the amount of physical memory available. The user's
-  // provided value or the estimate from planning can each be unreasonable.
-  per_backend_mem_to_admit = min(per_backend_mem_to_admit, MemInfo::physical_mem());
-  coord_backend_mem_to_admit = min(coord_backend_mem_to_admit, MemInfo::physical_mem());
+  // Enforce the MEM_LIMIT_EXECUTORS query option if MEM_LIMIT is not specified.
+  const bool is_mem_limit_executors_set = query_options().__isset.mem_limit_executors
+      && query_options().mem_limit_executors > 0;
+  if (!is_mem_limit_set && is_mem_limit_executors_set) {
+    per_backend_mem_to_admit = query_options().mem_limit_executors;
+  }
 
+  // Cap the memory estimate at the backend's memory limit for admission. The user's
+  // provided value or the estimate from planning can each be unreasonable.
+  if (FLAGS_clamp_query_mem_limit_backend_mem_limit) {
+    per_backend_mem_to_admit =
+        min(per_backend_mem_to_admit, executor_mem_limit_admission);
+    coord_backend_mem_to_admit =
+        min(coord_backend_mem_to_admit, coord_mem_limit_admission);
+  }
   // If the query is only scheduled to run on the coordinator.
   if (per_backend_schedule_states_.size() == 1 && RequiresCoordinatorFragment()) {
     per_backend_mem_to_admit = 0;
   }
 
   int64_t per_backend_mem_limit;
-  if (mimic_old_behaviour && !is_mem_limit_set) {
+  if (mimic_old_behaviour && !is_mem_limit_set && !is_mem_limit_executors_set) {
     per_backend_mem_limit = -1;
     query_schedule_pb_->set_coord_backend_mem_limit(-1);
   } else {
@@ -331,13 +344,6 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     query_schedule_pb_->set_coord_backend_mem_limit(coord_backend_mem_to_admit);
   }
 
-  // Finally, enforce the MEM_LIMIT_EXECUTORS query option if MEM_LIMIT is not specified.
-  if (!is_mem_limit_set && query_options().__isset.mem_limit_executors
-      && query_options().mem_limit_executors > 0) {
-    per_backend_mem_to_admit = query_options().mem_limit_executors;
-    per_backend_mem_limit = per_backend_mem_to_admit;
-  }
-
   query_schedule_pb_->set_coord_backend_mem_to_admit(coord_backend_mem_to_admit);
   query_schedule_pb_->set_per_backend_mem_limit(per_backend_mem_limit);
   query_schedule_pb_->set_per_backend_mem_to_admit(per_backend_mem_to_admit);
diff --git a/be/src/scheduling/schedule-state.h b/be/src/scheduling/schedule-state.h
index 50e9af012..80ad37798 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -287,8 +287,11 @@ class ScheduleState {
   /// Populates or updates the per host query memory limit and the amount of memory to be
   /// admitted based on the pool configuration passed to it. Must be called at least once
   /// before making any calls to per_backend_mem_to_admit(), per_backend_mem_limit() and
-  /// GetClusterMemoryToAdmit().
-  void UpdateMemoryRequirements(const TPoolConfig& pool_cfg);
+  /// GetClusterMemoryToAdmit(). If 'clamp_query_mem_limit_backend_mem_limit' is set, the
+  /// input 'coord_mem_limit_admission' and 'executor_mem_limit_admission' are used for
+  /// capping query memory limit on coordinator and executor backends, respectively.
+  void UpdateMemoryRequirements(const TPoolConfig& pool_cfg,
+      int64_t coord_mem_limit_admission, int64_t executor_mem_limit_admission);
 
   const std::string& executor_group() const { return executor_group_; }
 
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 8f0ea1e76..14e7eb620 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -102,6 +102,7 @@ ClusterMembershipMgr::BeDescSharedPtr BuildBackendDescriptor(const Host& host) {
   be_desc->set_is_coordinator(host.is_coordinator);
   be_desc->set_is_executor(host.is_executor);
   be_desc->set_is_quiescing(false);
+  be_desc->set_admit_mem_limit(GIGABYTE);
   ExecutorGroupDescPB* exec_desc = be_desc->add_executor_groups();
   exec_desc->set_name(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
   exec_desc->set_min_size(1);
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index ab8068412..654ca7e6f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -489,7 +489,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
-          pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT),
+      pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT)
+      + " -clamp_query_mem_limit_backend_mem_limit=false",
       num_exclusive_coordinators=1)
   def test_mem_limit_dedicated_coordinator(self, vector):
     """Regression test for IMPALA-8469: coordinator fragment should be admitted on
@@ -510,6 +511,32 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
               "1.10 GB is greater than memory available for admission 1.00 GB" in
               str(ex)), str(ex)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+      pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT)
+      + " -clamp_query_mem_limit_backend_mem_limit=true",
+      num_exclusive_coordinators=1,
+      cluster_size=2)
+  def test_clamp_query_mem_limit_backend_mem_limit_flag(self, vector):
+    """If a query requests more memory than backend's memory limit for admission, the
+    query gets admitted with the max memory for admission on backend."""
+    query = "select * from functional.alltypesagg limit 10"
+    exec_options = vector.get_value('exec_option')
+    # Requested mem_limit is more than the memory limit for admission on backends.
+    # mem_limit will be clamped to the mem limit for admission on backends.
+    exec_options['mem_limit'] = int(self.PROC_MEM_TEST_LIMIT * 1.1)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    assert "Cluster Memory Admitted: 2.00 GB" in str(result.runtime_profile), \
+           str(result.runtime_profile)
+    # Request mem_limit more than memory limit for admission on executors. Executor's
+    # memory limit will be clamped to the mem limit for admission on executor.
+    exec_options['mem_limit'] = 0
+    exec_options['mem_limit_executors'] = int(self.PROC_MEM_TEST_LIMIT * 1.1)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    assert "Cluster Memory Admitted: 1.10 GB" in str(result.runtime_profile), \
+           str(result.runtime_profile)
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -695,8 +722,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
       pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
-      queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),
-      start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G",
+      queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS)
+      + " -clamp_query_mem_limit_backend_mem_limit=false",
+      start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G;",
       statestored_args=_STATESTORED_ARGS)
   def test_heterogeneous_proc_mem_limit(self, vector):
     """ Test to ensure that the admission controller takes into account the actual proc
@@ -720,14 +748,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     exec_options['num_nodes'] = "1"
     self.execute_query_expect_success(self.client, query, exec_options)
     # Exercise rejection checks in admission controller.
-    try:
-      exec_options = copy(vector.get_value('exec_option'))
-      exec_options['mem_limit'] = "3G"
-      self.execute_query(query, exec_options)
-    except ImpalaBeeswaxException as e:
-      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB"
-          " is greater than memory available for admission 2.00 GB of \S+", str(e)), \
-          str(e)
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "3G"
+    ex = self.execute_query_expect_failure(self.client, query, exec_options)
+    assert ("Rejected query from pool default-pool: request memory needed "
+            "3.00 GB is greater than memory available for admission 2.00 GB" in
+            str(ex)), str(ex)
     # Exercise queuing checks in admission controller.
     try:
       # Wait for previous queries to finish to avoid flakiness.