You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/10/05 21:39:14 UTC

[2/8] impala git commit: IMPALA-7349: Add Admission control support for automatically setting per host memory limit for a query

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
new file mode 100644
index 0000000..8cd0d92
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
@@ -0,0 +1,153 @@
+====
+---- QUERY
+# All queries in this file are run with num_nodes=1 by default unless specified.
+############################
+# No mem_limit set
+# check if mem_admitted is same as mem_estimate
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from (select * from functional_parquet.alltypes limit 10) A,
+ (select * from functional_parquet.alltypes limit 10) B;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=64MB.*
+row_regex: .*Cluster Memory Admitted: 64.00 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+select * from functional_parquet.alltypes limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Cluster Memory Admitted: 32.09 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query) but the
+# upper bound enforced by pool.max_query_mem_limit takes precedence that cause the query
+# to be rejected eventually.
+set request_pool=poolLowMaxLimit;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMaxLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# No mem_limit set
+# Upper bound enforced by pool.max_query_mem_limit
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from functional_parquet.alltypes A, functional_parquet.alltypes B where
+ A.int_col = B.int_col limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=2.06GB.*
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# No mem_limit set
+# Lower bound enforced by pool.min_query_mem_limit
+set request_pool=regularPool;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# mem_limit is set
+# check if mem_admitted is same as mem_limit set in query options
+set request_pool=regularPool;
+set mem_limit=200mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 200.00 MB.*
+====
+---- QUERY
+# mem_limit is set
+# No lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+set mem_limit=27mb;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMinLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=40mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 2.00 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=50mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# Old behaviour: Both pool.max_query_mem_limit and pool.min_query_mem_limit are zero
+# No mem_limit set, check that the mem_estimate is used as mem_admitted and is allowed to
+# run which implies that it passes the check for min mem_limit required based on largest
+# min_reservation because the mem_limit is -1 (since it is not set in query options)
+set request_pool=poolNoMemLimits;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 10.00 MB.*
+====
+---- QUERY
+############################
+# Invalid pool config
+# min_query_mem_limit is greater than the max_query_mem_limit
+set request_pool=maxLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_query_mem_limit (100001 > 100000)
+====
+---- QUERY
+# Invalid pool config
+# min_query_mem_limit is greater than the max_mem_resources
+set request_pool=maxMemLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxMemLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_mem_resources (2621440001 > 2621440000)
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index e658c09..66f9047 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -5,7 +5,9 @@ select distinct * from functional_parquet.alltypesagg
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 68.09 MB. Set mem_limit to at least 100.09 MB.
+ current plan: 68.09 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 100.09 MB.
 ====
 ---- QUERY
 set mem_limit=150mb;
@@ -30,7 +32,9 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 14.75 MB. Set mem_limit to at least 46.75 MB.
+ current plan: 14.75 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 46.75 MB.
 ====
 ---- QUERY
 set mem_limit=50mb;
@@ -39,5 +43,7 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 26.00 MB. Set mem_limit to at least 58.00 MB.
+ current plan: 26.00 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 58.00 MB.
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/common/resource_pool_config.py
----------------------------------------------------------------------
diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py
new file mode 100644
index 0000000..adab034
--- /dev/null
+++ b/tests/common/resource_pool_config.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Basic helper class for making dynamic changes to the admission controller config files.
+# This is pretty bare-bones at the moment and only contains functionality necessary for
+# the tests it is used for. However, it is generic enough that it can be extended if
+# more functionality is required for adding tests.
+
+import os
+from time import sleep, time
+import xml.etree.ElementTree as ET
+
+
+class ResourcePoolConfig(object):
+
+  # Mapping of config strings used in the llama_site file with those used on the impala
+  # metrics debug page. Add to this dictionary if other configs are need for tests.
+  CONFIG_TO_METRIC_STR_MAPPING = {'max-query-mem-limit': 'pool-max-query-mem-limit'}
+
+  def __init__(self, impala_service, llama_site_path):
+    self.impala_service = impala_service
+    self.llama_site_path = llama_site_path
+    tree = ET.parse(llama_site_path)
+    self.root = tree.getroot()
+
+  def set_config_value(self, pool_name, config_str, target_val):
+    """Sets the value for the config parameter 'config_str' for the 'pool_name'
+    resource pool"""
+    node = self.__find_xml_node(self.root, pool_name, config_str)
+    node.find('value').text = str(target_val)
+    self.__write_xml_to_file(self.root, self.llama_site_path)
+    self.__wait_for_impala_to_pickup_config_change(pool_name, config_str, str(target_val))
+
+  def __wait_for_impala_to_pickup_config_change(
+      self, pool_name, config_str, target_val, timeout=20):
+    """Helper method that constantly sends a query for the 'pool_name' resource pool that
+    will be rejected but as a side effect would initiate a refresh of the pool config.
+    Then on every refresh it checks the pool metric corresponding to 'config_str' to see
+    if impala as picked up the change to that metric and is now equal to the
+    'target'val'. Times out after 'timeout' seconds"""
+    metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str]
+    client = self.impala_service.create_beeswax_client()
+    client.set_configuration_option('request_pool', pool_name)
+    # set mem_limit to something above the proc limit so that the query always gets
+    # rejected.
+    client.set_configuration_option('mem_limit', '10G')
+    metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name)
+    start_time = time()
+    while (time() - start_time < timeout):
+      handle = client.execute_async("select 1")
+      client.close_query(handle)
+      current_val = str(self.impala_service.get_metric_value(metric_key))
+      if current_val == target_val:
+        return
+      sleep(0.1)
+    assert False, "Timed out waiting for {0} to reach {1}. Current: {2}".format(
+      metric_key, target_val, current_val)
+
+  def __write_xml_to_file(self, xml_root, file_name):
+    # Make sure the change to the file is atomic. Write to a temp file and replace the
+    # original with it.
+    temp_path = file_name + "-temp"
+    file_handle = open(temp_path, "w")
+    file_handle.write(ET.tostring(xml_root))
+    file_handle.flush()
+    os.fsync(file_handle.fileno())
+    file_handle.close()
+    os.rename(temp_path, file_name)
+
+  def __find_xml_node(self, xml_root, pool_name, pool_attribute):
+    """Returns the xml node corresponding to the 'pool_attribute' for the 'pool_name'"""
+    for property in xml_root.iter('property'):
+      try:
+        name = property.find('name').text
+        # eg. of name = impala.admission-control.max-query-mem-limit-bytes.root.pool_name
+        if pool_name == name.split('.')[-1] and pool_attribute in name:
+          return property
+      except Exception as e:
+        print "Current DOM element being inspected: \n{0}".format(ET.dump(property))
+        raise e
+    assert False, "{0} attribute not found for pool {1} in the config XML:\n{2}".format(
+      pool_attribute, pool_name, ET.dump(xml_root))

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 2d909d8..24e2ce3 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -22,6 +22,7 @@ import logging
 import os
 import pytest
 import re
+import shutil
 import sys
 import threading
 from copy import copy
@@ -32,10 +33,12 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import specific_build_type_timeout, IMPALAD_BUILD
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import (
     SkipIfS3,
     SkipIfADLS,
-    SkipIfEC)
+    SkipIfEC,
+    SkipIfNotHdfsMinicluster)
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -43,7 +46,6 @@ from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
-from tests.verifiers.metric_verifier import MetricVerifier
 
 LOG = logging.getLogger('admission_test')
 
@@ -129,7 +131,7 @@ MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
 
 _STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
                      "-statestore_priority_update_frequency_ms={freq_ms}").format(
-                    freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+  freq_ms=STATESTORE_RPC_FREQUENCY_MS)
 
 # Name of the subscriber metric tracking the admission control update interval.
 REQUEST_QUEUE_UPDATE_INTERVAL =\
@@ -149,40 +151,55 @@ QUERY_END_TIMEOUT_S = 1
 INITIAL_QUEUE_REASON_REGEX = \
     "Initial admission queue reason: waited [0-9]* ms, reason: .*"
 
+# The path to resources directory which contains the admission control config files.
+RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources")
+
+
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
-    proc_mem_limit = None, queue_wait_timeout_ms=None):
+                                 proc_mem_limit=None, queue_wait_timeout_ms=None):
   extra_flags = ""
   if proc_mem_limit is not None:
     extra_flags += " -mem_limit={0}".format(proc_mem_limit)
   if queue_wait_timeout_ms is not None:
     extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
   return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
-      "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
-      max_requests, max_queued, pool_max_mem, extra_flags))
-
-
-def impalad_admission_ctrl_config_args(additional_args=""):
-  impalad_home = os.environ['IMPALA_HOME']
-  resources_dir = os.path.join(impalad_home, "fe", "src", "test", "resources")
-  fs_allocation_path = os.path.join(resources_dir, "fair-scheduler-test2.xml")
-  llama_site_path = os.path.join(resources_dir, "llama-site-test2.xml")
+          "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
+            max_requests, max_queued, pool_max_mem, extra_flags))
+
+
+def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
+                                        additional_args="", make_copy=False):
+  fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
+  llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
+  if make_copy:
+    copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file)
+    copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file)
+    shutil.copy2(fs_allocation_path, copy_fs_allocation_path)
+    shutil.copy2(llama_site_path, copy_llama_site_path)
+    fs_allocation_path = copy_fs_allocation_path
+    llama_site_path = copy_llama_site_path
   return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
-        "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, additional_args))
+          "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path,
+                                      additional_args))
+
 
 def log_metrics(log_prefix, metrics):
-  LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
+  LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "
       "released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
       metrics['dequeued'], metrics['rejected'], metrics['released'],
       metrics['timed-out'])
 
+
 def compute_metric_deltas(m2, m1):
   """Returns a dictionary of the differences of metrics in m2 and m1 (m2 - m1)"""
   return dict((n, m2.get(n, 0) - m1.get(n, 0)) for n in m2.keys())
 
+
 def metric_key(pool_name, metric_name):
   """Helper method to construct the admission controller metric keys"""
   return "admission-controller.%s.%s" % (metric_name, pool_name)
 
+
 class TestAdmissionControllerBase(CustomClusterTestSuite):
   @classmethod
   def get_workload(self):
@@ -196,6 +213,7 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
       create_uncompressed_text_dimension(cls.get_workload()))
 
+
 class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   def __check_pool_rejected(self, client, pool, expected_error_re):
     try:
@@ -227,7 +245,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     execute_statement_req.sessionHandle = self.session_handle
     execute_statement_req.confOverlay = {'request_pool': pool_name}
     if mem_limit is not None: execute_statement_req.confOverlay['mem_limit'] = mem_limit
-    execute_statement_req.statement = "select 1";
+    execute_statement_req.statement = "select 1"
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
     HS2TestSuite.check_response(execute_statement_resp)
 
@@ -262,7 +280,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         handles.append(client.execute_async(query))
       for query, handle in zip(queries, handles):
         self._wait_for_state(client, handle, client.QUERY_STATES['FINISHED'], timeout_s)
-        results = self.client.fetch(query, handle)
+        self.client.fetch(query, handle)
         profiles.append(self.client.get_runtime_profile(handle))
       return profiles
     finally:
@@ -279,7 +297,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args(),
+      impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-test2.xml",
+        llama_site_file="llama-site-test2.xml"),
       default_query_options=[('mem_limit', 200000000)],
       statestored_args=_STATESTORED_ARGS)
   @needs_session(conf_overlay={'batch_size': '100'})
@@ -290,7 +310,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     impalad = self.cluster.impalads[0]
     client = impalad.service.create_beeswax_client()
     # Expected default mem limit for queueA, used in several tests below
-    queueA_mem_limit = "MEM_LIMIT=%s" % (128*1024*1024)
+    queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024)
     try:
       for pool in ['', 'not_a_pool_name']:
         expected_error =\
@@ -309,7 +329,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # Query should execute in queueB which doesn't have a default mem limit set in the
       # llama-site.xml, so it should inherit the value from the default process query
       # options.
-      self.__check_query_options(result.runtime_profile,\
+      self.__check_query_options(result.runtime_profile,
           ['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB'])
 
       # Try setting the pool for a queue with a very low queue timeout.
@@ -320,7 +340,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
       assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
       # queueA has default query options mem_limit=128m,query_timeout_s=5
-      self.__check_query_options(client.get_runtime_profile(handle),\
+      self.__check_query_options(client.get_runtime_profile(handle),
           [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
       client.close_query(handle)
 
@@ -330,8 +350,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       client.execute("set mem_limit=31337")
       client.execute("set abort_on_error=1")
       result = client.execute("select 1")
-      self.__check_query_options(result.runtime_profile,\
-          ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',\
+      self.__check_query_options(result.runtime_profile,
+          ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',
            'REQUEST_POOL=root.queueA'])
 
       # Should be able to set query options (overriding defaults if applicable) with the
@@ -339,8 +359,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # max_io_buffers has no proc/pool default.
       client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'})
       result = client.execute("select 1")
-      self.__check_query_options(result.runtime_profile,\
-          ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
+      self.__check_query_options(result.runtime_profile,
+          ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',
            'ABORT_ON_ERROR=1'])
 
       # Once options are reset to their defaults, the queue
@@ -349,7 +369,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # abort on error, because it's back to being the default.
       client.execute('set mem_limit=""')
       client.execute('set abort_on_error=""')
-      client.set_configuration({ 'request_pool': 'root.queueA' })
+      client.set_configuration({'request_pool': 'root.queueA'})
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,
             [queueA_mem_limit, 'REQUEST_POOL=root.queueA', 'QUERY_TIMEOUT_S=5'])
@@ -363,18 +383,21 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     batch_size = "BATCH_SIZE=100"
 
     # Check HS2 query in queueA gets the correct query options for the pool.
-    self.__check_hs2_query_opts("root.queueA", None,\
+    self.__check_hs2_query_opts("root.queueA", None,
         [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
     # Check overriding the mem limit sent in the confOverlay with the query.
-    self.__check_hs2_query_opts("root.queueA", '12345',\
+    self.__check_hs2_query_opts("root.queueA", '12345',
         ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
     # Check HS2 query in queueB gets the process-wide default query options
-    self.__check_hs2_query_opts("root.queueB", None,\
+    self.__check_hs2_query_opts("root.queueB", None,
         ['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB', batch_size])
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args("-require_username"),
+      impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-test2.xml",
+        llama_site_file="llama-site-test2.xml",
+        additional_args="-require_username"),
       statestored_args=_STATESTORED_ARGS)
   def test_require_user(self):
     open_session_req = TCLIService.TOpenSessionReq()
@@ -394,7 +417,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       close_req.sessionHandle = open_session_resp.sessionHandle
       TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
 
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
@@ -503,7 +525,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
       impalad_with_2g_mem.execute_async("select sleep(1000)")
       # Wait for statestore update to update the mem admitted in each node.
-      sleep(STATESTORE_RPC_FREQUENCY_MS/1000)
+      sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
       exec_options = copy(vector.get_value('exec_option'))
       exec_options['mem_limit'] = "2G"
       # Since Queuing is synchronous and we can't close the previous query till this
@@ -518,7 +540,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args= "--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
+    impalad_args="--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
         max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
     statestored_args=_STATESTORED_ARGS)
   def test_cancellation(self):
@@ -528,7 +550,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     client = impalad.service.create_beeswax_client()
     try:
       client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
-      client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1 )
+      client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
       handle = client.execute_async("select 1")
       sleep(1)
       client.close_query(handle)
@@ -638,7 +660,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         "9.00 MB but only 1.00 MB was available."
     NUM_QUERIES = 5
     profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
-        TIMEOUT_S, {'mem_limit':'9mb'})
+        TIMEOUT_S, {'mem_limit': '9mb'})
 
     num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
     assert num_reasons == NUM_QUERIES - 1, \
@@ -702,6 +724,95 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       assert num_q == expected_num, "There should be {0} running queries on either " \
                                     "impalads: {0}".format(query_locations)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml", make_copy=True),
+    statestored_args=_STATESTORED_ARGS)
+  def test_pool_mem_limit_configs(self, vector):
+    """Runs functional tests for the max/min_query_mem_limit pool config attributes"""
+    exec_options = vector.get_value('exec_option')
+    # Set this to the default.
+    exec_options['exec_single_node_rows_threshold'] = 100
+    # Set num_nodes to 1 since its easier to see one-to-one mapping of per_host and
+    # per_cluster values used in the test.
+    exec_options['num_nodes'] = 1
+    self.run_test_case('QueryTest/admission-max-min-mem-limits', vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml",
+      additional_args="-default_pool_max_requests 1", make_copy=True),
+    statestored_args=_STATESTORED_ARGS)
+  def test_pool_config_change_while_queued(self, vector):
+    """Tests if the invalid checks work even if the query is queued. Makes sure the query
+    is not dequeued if the config is invalid and is promptly dequeued when it goes back
+    to being valid"""
+    pool_name = "invalidTestPool"
+    config_str = "max-query-mem-limit"
+    self.client.set_configuration_option('request_pool', pool_name)
+    # Setup to queue a query.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    queued_query_handle = self.client.execute_async("select 1")
+    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+
+    # Change config to be invalid.
+    llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
+    config = ResourcePoolConfig(self.cluster.impalads[0].service, llama_site_path)
+    config.set_config_value(pool_name, config_str, 1)
+    # Close running query so the queued one gets a chance.
+    self.client.close_query(sleep_query_handle)
+
+    # Check latest queued reason changed
+    queued_reason = "Latest admission queue reason: Invalid pool config: the " \
+                    "min_query_mem_limit is greater than the max_query_mem_limit" \
+                    " (26214400 > 1)"
+    self.__wait_for_change_to_profile(queued_query_handle, queued_reason)
+
+    # Now change the config back to valid value and make sure the query is allowed to run.
+    config.set_config_value(pool_name, config_str, 0)
+    self.client.wait_for_finished_timeout(queued_query_handle, 20)
+    self.close_query(queued_query_handle)
+
+    # Now do the same thing for change to pool.max-query-mem-limit such that it can no
+    # longer accommodate the largest min_reservation.
+    # Setup to queue a query.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    queued_query_handle = self.client.execute_async(
+      "select * from functional_parquet.alltypes limit 1")
+    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+    # Change config to something less than the what is required to accommodate the
+    # largest min_reservation (which in this case is 32.09 MB.
+    config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
+    # Close running query so the queued one gets a chance.
+    self.client.close_query(sleep_query_handle)
+    # Check latest queued reason changed
+    queued_reason = "minimum memory reservation is greater than memory available to " \
+                    "the query for buffer reservations. Memory reservation needed given" \
+                    " the current plan: 88.00 KB. Adjust either the mem_limit or the" \
+                    " pool config (max-query-mem-limit, min-query-mem-limit) for the" \
+                    " query to allow the query memory limit to be at least 32.09 MB."
+    self.__wait_for_change_to_profile(queued_query_handle, queued_reason, 5)
+    # Now change the config back to a reasonable value.
+    config.set_config_value(pool_name, config_str, 0)
+    self.client.wait_for_finished_timeout(queued_query_handle, 20)
+    self.close_query(queued_query_handle)
+
+  def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+    for _ in range(timeout * 10):
+      profile = self.client.get_runtime_profile(query_handle)
+      if search_string in profile:
+        return
+      sleep(0.1)
+    assert False, "Timed out waiting for change to profile\nSearch " \
+                  "String: {0}\nProfile:\n{1}".format(search_string, str(profile))
+
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
@@ -753,8 +864,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       num_queries = 30
       cls.ImpalaTestMatrix.add_constraint(
           lambda v: v.get_value('submission_delay_ms') == 0)
-      cls.ImpalaTestMatrix.add_constraint(\
-          lambda v: v.get_value('round_robin_submission') == True)
+      cls.ImpalaTestMatrix.add_constraint(
+          lambda v: v.get_value('round_robin_submission'))
 
     if num_queries is not None:
       cls.ImpalaTestMatrix.add_constraint(
@@ -801,8 +912,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     The metrics names are shortened for brevity: 'admitted', 'queued', 'dequeued',
     'rejected', 'released', and 'timed-out'.
     """
-    metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected' : 0,
-        'released': 0, 'timed-out': 0}
+    metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected': 0,
+               'released': 0, 'timed-out': 0}
     for impalad in self.impalads:
       keys = [metric_key(self.pool_name, 'total-%s' % short_name)
               for short_name in metrics.keys()]
@@ -847,8 +958,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       current = self.get_admission_metrics()
       log_metrics("wait_for_metric_changes, current=", current)
       deltas = compute_metric_deltas(current, initial)
-      delta_sum = sum([ deltas[x] for x in metric_names ])
-      LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
+      delta_sum = sum([deltas[x] for x in metric_names])
+      LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",
           delta_sum, deltas, expected_delta, metric_names)
       if delta_sum >= expected_delta:
         LOG.info("Found all %s metrics after %s seconds", delta_sum,
@@ -857,13 +968,13 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting {0} seconds for metrics {1} delta {2} "\
           "current {3} initial {4}" .format(
-              STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current), str(initial))
+              STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current),
+              str(initial))
       sleep(1)
 
   def wait_for_statestore_updates(self, heartbeats):
     """Waits for a number of admission control statestore updates from all impalads."""
     start_time = time()
-    num_impalads = len(self.impalads)
     init = dict()
     curr = dict()
     for impalad in self.impalads:
@@ -898,7 +1009,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     while len(self.executing_threads) < num_threads:
       assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for "
           "%s admitted client rpcs to return. Only %s executing " % (
-          STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
+            STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
       sleep(0.1)
     LOG.info("Found all %s admitted threads after %s seconds", num_threads,
         round(time() - start_time, 1))
@@ -955,7 +1066,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       # lock protects query_handle and shutdown, used by the main thread in teardown()
       self.lock = threading.RLock()
       self.query_handle = None
-      self.shutdown = False # Set by the main thread when tearing down
+      self.shutdown = False  # Set by the main thread when tearing down
 
     def run(self):
       client = None
@@ -1039,7 +1150,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         # Sleep and wait for the query to be cancelled. The cancellation will
         # set the state to EXCEPTION.
         start_time = time()
-        while (client.get_state(self.query_handle) != \
+        while (client.get_state(self.query_handle) !=
                client.QUERY_STATES['EXCEPTION']):
           assert (time() - start_time < STRESS_TIMEOUT),\
             "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
@@ -1060,7 +1171,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     admission control."""
     for impalad in self.impalads:
       queries_json = impalad.service.get_debug_webpage_json('/queries')
-      for query in itertools.chain(queries_json['in_flight_queries'], \
+      for query in itertools.chain(queries_json['in_flight_queries'],
           queries_json['completed_queries']):
         if query['stmt_type'] == 'QUERY' or query['stmt_type'] == 'DML':
           assert query['last_event'] != 'Registered' and \
@@ -1090,8 +1201,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
 
     num_queries = vector.get_value('num_queries')
     assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
-    initial_metrics = self.get_admission_metrics();
-    log_metrics("Initial metrics: ", initial_metrics);
+    initial_metrics = self.get_admission_metrics()
+    log_metrics("Initial metrics: ", initial_metrics)
 
     for query_num in xrange(num_queries):
       impalad = self.impalads[query_num % len(self.impalads)]
@@ -1109,7 +1220,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # without this thread explicitly ending them, so that the test can admit queries in
     # discrete waves.
     LOG.info("Wait for initial admission decisions")
-    (metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
+    (metric_deltas, curr_metrics) = self.wait_for_metric_changes(
         ['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
     # Also wait for the test threads that submitted the queries to start executing.
     self.wait_for_admitted_threads(metric_deltas['admitted'])
@@ -1129,7 +1240,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     assert metric_deltas['queued'] <= MAX_NUM_QUEUED_QUERIES * len(self.impalads),\
         "Queued too many queries: at least one daemon queued too many"
     assert metric_deltas['rejected'] + metric_deltas['admitted'] +\
-        metric_deltas['queued'] == num_queries ,\
+        metric_deltas['queued'] == num_queries,\
         "Initial admission decisions don't add up to {0}: {1}".format(
         num_queries, str(metric_deltas))
     initial_metric_deltas = metric_deltas
@@ -1144,8 +1255,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # Admit queries in waves until all queries are done. A new wave of admission
     # is started by killing some of the running queries.
     while len(self.executing_threads) > 0:
-      curr_metrics = self.get_consistent_admission_metrics(num_queries);
-      log_metrics("Main loop, curr_metrics: ", curr_metrics);
+      curr_metrics = self.get_consistent_admission_metrics(num_queries)
+      log_metrics("Main loop, curr_metrics: ", curr_metrics)
       num_to_end = len(self.executing_threads)
       LOG.info("Main loop, will request %s queries to end", num_to_end)
       self.end_admitted_queries(num_to_end)
@@ -1166,8 +1277,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       # state or we may find an impalad dequeue more requests after we capture metrics.
       self.wait_for_statestore_updates(10)
 
-    final_metrics = self.get_consistent_admission_metrics(num_queries);
-    log_metrics("Final metrics: ", final_metrics);
+    final_metrics = self.get_consistent_admission_metrics(num_queries)
+    log_metrics("Final metrics: ", final_metrics)
     metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
     assert metric_deltas['timed-out'] == 0
 
@@ -1212,8 +1323,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args(),
-      statestored_args=_STATESTORED_ARGS)
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="fair-scheduler-test2.xml",
+      llama_site_file="llama-site-test2.xml"),
+    statestored_args=_STATESTORED_ARGS)
   def test_admission_controller_with_configs(self, vector):
     self.pool_name = 'root.queueB'
     self.run_admission_test(vector, {'request_pool': self.pool_name})
@@ -1241,7 +1354,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # settings of the OS. It should be fine to continue anyway.
     proc_limit = self.get_proc_limit()
     if proc_limit != MEM_TEST_LIMIT:
-      LOG.info("Warning: Process mem limit %s is not expected val %s", limit_val,
+      LOG.info("Warning: Process mem limit %s is not expected val %s", proc_limit,
           MEM_TEST_LIMIT)
 
     self.pool_name = 'default-pool'