You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/10/05 21:39:14 UTC
[2/8] impala git commit: IMPALA-7349: Add Admission control support
for automatically setting per host memory limit for a query
http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
new file mode 100644
index 0000000..8cd0d92
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
@@ -0,0 +1,153 @@
+====
+---- QUERY
+# All queries in this file are run with num_nodes=1 by default unless specified.
+############################
+# No mem_limit set
+# check if mem_admitted is same as mem_estimate
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from (select * from functional_parquet.alltypes limit 10) A,
+ (select * from functional_parquet.alltypes limit 10) B;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=64MB.*
+row_regex: .*Cluster Memory Admitted: 64.00 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+select * from functional_parquet.alltypes limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Cluster Memory Admitted: 32.09 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query) but the
+# upper bound enforced by pool.max_query_mem_limit takes precedence that cause the query
+# to be rejected eventually.
+set request_pool=poolLowMaxLimit;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMaxLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# No mem_limit set
+# Upper bound enforced by pool.max_query_mem_limit
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from functional_parquet.alltypes A, functional_parquet.alltypes B where
+ A.int_col = B.int_col limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=2.06GB.*
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# No mem_limit set
+# Lower bound enforced by pool.min_query_mem_limit
+set request_pool=regularPool;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# mem_limit is set
+# check if mem_admitted is same as mem_limit set in query options
+set request_pool=regularPool;
+set mem_limit=200mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 200.00 MB.*
+====
+---- QUERY
+# mem_limit is set
+# No lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+set mem_limit=27mb;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMinLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=40mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 2.00 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=50mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# Old behaviour: Both pool.max_query_mem_limit and pool.min_query_mem_limit are zero
+# No mem_limit set, check that the mem_estimate is used as mem_admitted and is allowed to
+# run which implies that it passes the check for min mem_limit required based on largest
+# min_reservation because the mem_limit is -1 (since it is not set in query options)
+set request_pool=poolNoMemLimits;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 10.00 MB.*
+====
+---- QUERY
+############################
+# Invalid pool config
+# min_query_mem_limit is greater than the max_query_mem_limit
+set request_pool=maxLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_query_mem_limit (100001 > 100000)
+====
+---- QUERY
+# Invalid pool config
+# min_query_mem_limit is greater than the max_mem_resources
+set request_pool=maxMemLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxMemLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_mem_resources (2621440001 > 2621440000)
+====
http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index e658c09..66f9047 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -5,7 +5,9 @@ select distinct * from functional_parquet.alltypesagg
---- CATCH
minimum memory reservation is greater than memory available to the
query for buffer reservations. Memory reservation needed given the
- current plan: 68.09 MB. Set mem_limit to at least 100.09 MB.
+ current plan: 68.09 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 100.09 MB.
====
---- QUERY
set mem_limit=150mb;
@@ -30,7 +32,9 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
---- CATCH
minimum memory reservation is greater than memory available to the
query for buffer reservations. Memory reservation needed given the
- current plan: 14.75 MB. Set mem_limit to at least 46.75 MB.
+ current plan: 14.75 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 46.75 MB.
====
---- QUERY
set mem_limit=50mb;
@@ -39,5 +43,7 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
---- CATCH
minimum memory reservation is greater than memory available to the
query for buffer reservations. Memory reservation needed given the
- current plan: 26.00 MB. Set mem_limit to at least 58.00 MB.
+ current plan: 26.00 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 58.00 MB.
====
http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/common/resource_pool_config.py
----------------------------------------------------------------------
diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py
new file mode 100644
index 0000000..adab034
--- /dev/null
+++ b/tests/common/resource_pool_config.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Basic helper class for making dynamic changes to the admission controller config files.
+# This is pretty bare-bones at the moment and only contains functionality necessary for
+# the tests it is used for. However, it is generic enough that it can be extended if
+# more functionality is required for adding tests.
+
+import os
+from time import sleep, time
+import xml.etree.ElementTree as ET
+
+
+class ResourcePoolConfig(object):
+
+ # Mapping of config strings used in the llama_site file with those used on the impala
+ # metrics debug page. Add to this dictionary if other configs are need for tests.
+ CONFIG_TO_METRIC_STR_MAPPING = {'max-query-mem-limit': 'pool-max-query-mem-limit'}
+
+ def __init__(self, impala_service, llama_site_path):
+ self.impala_service = impala_service
+ self.llama_site_path = llama_site_path
+ tree = ET.parse(llama_site_path)
+ self.root = tree.getroot()
+
+ def set_config_value(self, pool_name, config_str, target_val):
+ """Sets the value for the config parameter 'config_str' for the 'pool_name'
+ resource pool"""
+ node = self.__find_xml_node(self.root, pool_name, config_str)
+ node.find('value').text = str(target_val)
+ self.__write_xml_to_file(self.root, self.llama_site_path)
+ self.__wait_for_impala_to_pickup_config_change(pool_name, config_str, str(target_val))
+
+ def __wait_for_impala_to_pickup_config_change(
+ self, pool_name, config_str, target_val, timeout=20):
+ """Helper method that constantly sends a query for the 'pool_name' resource pool that
+ will be rejected but as a side effect would initiate a refresh of the pool config.
+ Then on every refresh it checks the pool metric corresponding to 'config_str' to see
+ if impala as picked up the change to that metric and is now equal to the
+ 'target'val'. Times out after 'timeout' seconds"""
+ metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str]
+ client = self.impala_service.create_beeswax_client()
+ client.set_configuration_option('request_pool', pool_name)
+ # set mem_limit to something above the proc limit so that the query always gets
+ # rejected.
+ client.set_configuration_option('mem_limit', '10G')
+ metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name)
+ start_time = time()
+ while (time() - start_time < timeout):
+ handle = client.execute_async("select 1")
+ client.close_query(handle)
+ current_val = str(self.impala_service.get_metric_value(metric_key))
+ if current_val == target_val:
+ return
+ sleep(0.1)
+ assert False, "Timed out waiting for {0} to reach {1}. Current: {2}".format(
+ metric_key, target_val, current_val)
+
+ def __write_xml_to_file(self, xml_root, file_name):
+ # Make sure the change to the file is atomic. Write to a temp file and replace the
+ # original with it.
+ temp_path = file_name + "-temp"
+ file_handle = open(temp_path, "w")
+ file_handle.write(ET.tostring(xml_root))
+ file_handle.flush()
+ os.fsync(file_handle.fileno())
+ file_handle.close()
+ os.rename(temp_path, file_name)
+
+ def __find_xml_node(self, xml_root, pool_name, pool_attribute):
+ """Returns the xml node corresponding to the 'pool_attribute' for the 'pool_name'"""
+ for property in xml_root.iter('property'):
+ try:
+ name = property.find('name').text
+ # eg. of name = impala.admission-control.max-query-mem-limit-bytes.root.pool_name
+ if pool_name == name.split('.')[-1] and pool_attribute in name:
+ return property
+ except Exception as e:
+ print "Current DOM element being inspected: \n{0}".format(ET.dump(property))
+ raise e
+ assert False, "{0} attribute not found for pool {1} in the config XML:\n{2}".format(
+ pool_attribute, pool_name, ET.dump(xml_root))
http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 2d909d8..24e2ce3 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -22,6 +22,7 @@ import logging
import os
import pytest
import re
+import shutil
import sys
import threading
from copy import copy
@@ -32,10 +33,12 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import specific_build_type_timeout, IMPALAD_BUILD
from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.resource_pool_config import ResourcePoolConfig
from tests.common.skip import (
SkipIfS3,
SkipIfADLS,
- SkipIfEC)
+ SkipIfEC,
+ SkipIfNotHdfsMinicluster)
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
@@ -43,7 +46,6 @@ from tests.common.test_vector import ImpalaTestDimension
from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
from ImpalaService import ImpalaHiveServer2Service
from TCLIService import TCLIService
-from tests.verifiers.metric_verifier import MetricVerifier
LOG = logging.getLogger('admission_test')
@@ -129,7 +131,7 @@ MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
"-statestore_priority_update_frequency_ms={freq_ms}").format(
- freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+ freq_ms=STATESTORE_RPC_FREQUENCY_MS)
# Name of the subscriber metric tracking the admission control update interval.
REQUEST_QUEUE_UPDATE_INTERVAL =\
@@ -149,40 +151,55 @@ QUERY_END_TIMEOUT_S = 1
INITIAL_QUEUE_REASON_REGEX = \
"Initial admission queue reason: waited [0-9]* ms, reason: .*"
+# The path to resources directory which contains the admission control config files.
+RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources")
+
+
def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
- proc_mem_limit = None, queue_wait_timeout_ms=None):
+ proc_mem_limit=None, queue_wait_timeout_ms=None):
extra_flags = ""
if proc_mem_limit is not None:
extra_flags += " -mem_limit={0}".format(proc_mem_limit)
if queue_wait_timeout_ms is not None:
extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
- "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
- max_requests, max_queued, pool_max_mem, extra_flags))
-
-
-def impalad_admission_ctrl_config_args(additional_args=""):
- impalad_home = os.environ['IMPALA_HOME']
- resources_dir = os.path.join(impalad_home, "fe", "src", "test", "resources")
- fs_allocation_path = os.path.join(resources_dir, "fair-scheduler-test2.xml")
- llama_site_path = os.path.join(resources_dir, "llama-site-test2.xml")
+ "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
+ max_requests, max_queued, pool_max_mem, extra_flags))
+
+
+def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
+ additional_args="", make_copy=False):
+ fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
+ llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
+ if make_copy:
+ copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file)
+ copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file)
+ shutil.copy2(fs_allocation_path, copy_fs_allocation_path)
+ shutil.copy2(llama_site_path, copy_llama_site_path)
+ fs_allocation_path = copy_fs_allocation_path
+ llama_site_path = copy_llama_site_path
return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
- "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, additional_args))
+ "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path,
+ additional_args))
+
def log_metrics(log_prefix, metrics):
- LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
+ LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "
"released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
metrics['dequeued'], metrics['rejected'], metrics['released'],
metrics['timed-out'])
+
def compute_metric_deltas(m2, m1):
"""Returns a dictionary of the differences of metrics in m2 and m1 (m2 - m1)"""
return dict((n, m2.get(n, 0) - m1.get(n, 0)) for n in m2.keys())
+
def metric_key(pool_name, metric_name):
"""Helper method to construct the admission controller metric keys"""
return "admission-controller.%s.%s" % (metric_name, pool_name)
+
class TestAdmissionControllerBase(CustomClusterTestSuite):
@classmethod
def get_workload(self):
@@ -196,6 +213,7 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
+
class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def __check_pool_rejected(self, client, pool, expected_error_re):
try:
@@ -227,7 +245,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
execute_statement_req.sessionHandle = self.session_handle
execute_statement_req.confOverlay = {'request_pool': pool_name}
if mem_limit is not None: execute_statement_req.confOverlay['mem_limit'] = mem_limit
- execute_statement_req.statement = "select 1";
+ execute_statement_req.statement = "select 1"
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
HS2TestSuite.check_response(execute_statement_resp)
@@ -262,7 +280,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
handles.append(client.execute_async(query))
for query, handle in zip(queries, handles):
self._wait_for_state(client, handle, client.QUERY_STATES['FINISHED'], timeout_s)
- results = self.client.fetch(query, handle)
+ self.client.fetch(query, handle)
profiles.append(self.client.get_runtime_profile(handle))
return profiles
finally:
@@ -279,7 +297,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args=impalad_admission_ctrl_config_args(),
+ impalad_args=impalad_admission_ctrl_config_args(
+ fs_allocation_file="fair-scheduler-test2.xml",
+ llama_site_file="llama-site-test2.xml"),
default_query_options=[('mem_limit', 200000000)],
statestored_args=_STATESTORED_ARGS)
@needs_session(conf_overlay={'batch_size': '100'})
@@ -290,7 +310,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
# Expected default mem limit for queueA, used in several tests below
- queueA_mem_limit = "MEM_LIMIT=%s" % (128*1024*1024)
+ queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024)
try:
for pool in ['', 'not_a_pool_name']:
expected_error =\
@@ -309,7 +329,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Query should execute in queueB which doesn't have a default mem limit set in the
# llama-site.xml, so it should inherit the value from the default process query
# options.
- self.__check_query_options(result.runtime_profile,\
+ self.__check_query_options(result.runtime_profile,
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB'])
# Try setting the pool for a queue with a very low queue timeout.
@@ -320,7 +340,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
# queueA has default query options mem_limit=128m,query_timeout_s=5
- self.__check_query_options(client.get_runtime_profile(handle),\
+ self.__check_query_options(client.get_runtime_profile(handle),
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
client.close_query(handle)
@@ -330,8 +350,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
client.execute("set mem_limit=31337")
client.execute("set abort_on_error=1")
result = client.execute("select 1")
- self.__check_query_options(result.runtime_profile,\
- ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',\
+ self.__check_query_options(result.runtime_profile,
+ ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',
'REQUEST_POOL=root.queueA'])
# Should be able to set query options (overriding defaults if applicable) with the
@@ -339,8 +359,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# max_io_buffers has no proc/pool default.
client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'})
result = client.execute("select 1")
- self.__check_query_options(result.runtime_profile,\
- ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
+ self.__check_query_options(result.runtime_profile,
+ ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',
'ABORT_ON_ERROR=1'])
# Once options are reset to their defaults, the queue
@@ -349,7 +369,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# abort on error, because it's back to being the default.
client.execute('set mem_limit=""')
client.execute('set abort_on_error=""')
- client.set_configuration({ 'request_pool': 'root.queueA' })
+ client.set_configuration({'request_pool': 'root.queueA'})
result = client.execute("select 1")
self.__check_query_options(result.runtime_profile,
[queueA_mem_limit, 'REQUEST_POOL=root.queueA', 'QUERY_TIMEOUT_S=5'])
@@ -363,18 +383,21 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
batch_size = "BATCH_SIZE=100"
# Check HS2 query in queueA gets the correct query options for the pool.
- self.__check_hs2_query_opts("root.queueA", None,\
+ self.__check_hs2_query_opts("root.queueA", None,
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
# Check overriding the mem limit sent in the confOverlay with the query.
- self.__check_hs2_query_opts("root.queueA", '12345',\
+ self.__check_hs2_query_opts("root.queueA", '12345',
['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
# Check HS2 query in queueB gets the process-wide default query options
- self.__check_hs2_query_opts("root.queueB", None,\
+ self.__check_hs2_query_opts("root.queueB", None,
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB', batch_size])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args=impalad_admission_ctrl_config_args("-require_username"),
+ impalad_args=impalad_admission_ctrl_config_args(
+ fs_allocation_file="fair-scheduler-test2.xml",
+ llama_site_file="llama-site-test2.xml",
+ additional_args="-require_username"),
statestored_args=_STATESTORED_ARGS)
def test_require_user(self):
open_session_req = TCLIService.TOpenSessionReq()
@@ -394,7 +417,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
close_req.sessionHandle = open_session_resp.sessionHandle
TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
-
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
@@ -503,7 +525,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
impalad_with_2g_mem.execute_async("select sleep(1000)")
# Wait for statestore update to update the mem admitted in each node.
- sleep(STATESTORE_RPC_FREQUENCY_MS/1000)
+ sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
# Since Queuing is synchronous and we can't close the previous query till this
@@ -518,7 +540,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args= "--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
+ impalad_args="--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
statestored_args=_STATESTORED_ARGS)
def test_cancellation(self):
@@ -528,7 +550,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
client = impalad.service.create_beeswax_client()
try:
client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
- client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1 )
+ client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
handle = client.execute_async("select 1")
sleep(1)
client.close_query(handle)
@@ -638,7 +660,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
"9.00 MB but only 1.00 MB was available."
NUM_QUERIES = 5
profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
- TIMEOUT_S, {'mem_limit':'9mb'})
+ TIMEOUT_S, {'mem_limit': '9mb'})
num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
assert num_reasons == NUM_QUERIES - 1, \
@@ -702,6 +724,95 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
assert num_q == expected_num, "There should be {0} running queries on either " \
"impalads: {0}".format(query_locations)
+ @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_config_args(
+ fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+ llama_site_file="mem-limit-test-llama-site.xml", make_copy=True),
+ statestored_args=_STATESTORED_ARGS)
+ def test_pool_mem_limit_configs(self, vector):
+ """Runs functional tests for the max/min_query_mem_limit pool config attributes"""
+ exec_options = vector.get_value('exec_option')
+ # Set this to the default.
+ exec_options['exec_single_node_rows_threshold'] = 100
+ # Set num_nodes to 1 since its easier to see one-to-one mapping of per_host and
+ # per_cluster values used in the test.
+ exec_options['num_nodes'] = 1
+ self.run_test_case('QueryTest/admission-max-min-mem-limits', vector)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_config_args(
+ fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+ llama_site_file="mem-limit-test-llama-site.xml",
+ additional_args="-default_pool_max_requests 1", make_copy=True),
+ statestored_args=_STATESTORED_ARGS)
+ def test_pool_config_change_while_queued(self, vector):
+ """Tests if the invalid checks work even if the query is queued. Makes sure the query
+ is not dequeued if the config is invalid and is promptly dequeued when it goes back
+ to being valid"""
+ pool_name = "invalidTestPool"
+ config_str = "max-query-mem-limit"
+ self.client.set_configuration_option('request_pool', pool_name)
+ # Setup to queue a query.
+ sleep_query_handle = self.client.execute_async("select sleep(10000)")
+ self.client.wait_for_admission_control(sleep_query_handle)
+ queued_query_handle = self.client.execute_async("select 1")
+ self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+
+ # Change config to be invalid.
+ llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
+ config = ResourcePoolConfig(self.cluster.impalads[0].service, llama_site_path)
+ config.set_config_value(pool_name, config_str, 1)
+ # Close running query so the queued one gets a chance.
+ self.client.close_query(sleep_query_handle)
+
+ # Check latest queued reason changed
+ queued_reason = "Latest admission queue reason: Invalid pool config: the " \
+ "min_query_mem_limit is greater than the max_query_mem_limit" \
+ " (26214400 > 1)"
+ self.__wait_for_change_to_profile(queued_query_handle, queued_reason)
+
+ # Now change the config back to valid value and make sure the query is allowed to run.
+ config.set_config_value(pool_name, config_str, 0)
+ self.client.wait_for_finished_timeout(queued_query_handle, 20)
+ self.close_query(queued_query_handle)
+
+ # Now do the same thing for change to pool.max-query-mem-limit such that it can no
+ # longer accommodate the largest min_reservation.
+ # Setup to queue a query.
+ sleep_query_handle = self.client.execute_async("select sleep(10000)")
+ self.client.wait_for_admission_control(sleep_query_handle)
+ queued_query_handle = self.client.execute_async(
+ "select * from functional_parquet.alltypes limit 1")
+ self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+ # Change config to something less than the what is required to accommodate the
+ # largest min_reservation (which in this case is 32.09 MB.
+ config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
+ # Close running query so the queued one gets a chance.
+ self.client.close_query(sleep_query_handle)
+ # Check latest queued reason changed
+ queued_reason = "minimum memory reservation is greater than memory available to " \
+ "the query for buffer reservations. Memory reservation needed given" \
+ " the current plan: 88.00 KB. Adjust either the mem_limit or the" \
+ " pool config (max-query-mem-limit, min-query-mem-limit) for the" \
+ " query to allow the query memory limit to be at least 32.09 MB."
+ self.__wait_for_change_to_profile(queued_query_handle, queued_reason, 5)
+ # Now change the config back to a reasonable value.
+ config.set_config_value(pool_name, config_str, 0)
+ self.client.wait_for_finished_timeout(queued_query_handle, 20)
+ self.close_query(queued_query_handle)
+
+ def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+ for _ in range(timeout * 10):
+ profile = self.client.get_runtime_profile(query_handle)
+ if search_string in profile:
+ return
+ sleep(0.1)
+ assert False, "Timed out waiting for change to profile\nSearch " \
+ "String: {0}\nProfile:\n{1}".format(search_string, str(profile))
+
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between submissions
@@ -753,8 +864,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
num_queries = 30
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('submission_delay_ms') == 0)
- cls.ImpalaTestMatrix.add_constraint(\
- lambda v: v.get_value('round_robin_submission') == True)
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('round_robin_submission'))
if num_queries is not None:
cls.ImpalaTestMatrix.add_constraint(
@@ -801,8 +912,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
The metrics names are shortened for brevity: 'admitted', 'queued', 'dequeued',
'rejected', 'released', and 'timed-out'.
"""
- metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected' : 0,
- 'released': 0, 'timed-out': 0}
+ metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected': 0,
+ 'released': 0, 'timed-out': 0}
for impalad in self.impalads:
keys = [metric_key(self.pool_name, 'total-%s' % short_name)
for short_name in metrics.keys()]
@@ -847,8 +958,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
current = self.get_admission_metrics()
log_metrics("wait_for_metric_changes, current=", current)
deltas = compute_metric_deltas(current, initial)
- delta_sum = sum([ deltas[x] for x in metric_names ])
- LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
+ delta_sum = sum([deltas[x] for x in metric_names])
+ LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",
delta_sum, deltas, expected_delta, metric_names)
if delta_sum >= expected_delta:
LOG.info("Found all %s metrics after %s seconds", delta_sum,
@@ -857,13 +968,13 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting {0} seconds for metrics {1} delta {2} "\
"current {3} initial {4}" .format(
- STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current), str(initial))
+ STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current),
+ str(initial))
sleep(1)
def wait_for_statestore_updates(self, heartbeats):
"""Waits for a number of admission control statestore updates from all impalads."""
start_time = time()
- num_impalads = len(self.impalads)
init = dict()
curr = dict()
for impalad in self.impalads:
@@ -898,7 +1009,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
while len(self.executing_threads) < num_threads:
assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for "
"%s admitted client rpcs to return. Only %s executing " % (
- STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
+ STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
sleep(0.1)
LOG.info("Found all %s admitted threads after %s seconds", num_threads,
round(time() - start_time, 1))
@@ -955,7 +1066,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# lock protects query_handle and shutdown, used by the main thread in teardown()
self.lock = threading.RLock()
self.query_handle = None
- self.shutdown = False # Set by the main thread when tearing down
+ self.shutdown = False # Set by the main thread when tearing down
def run(self):
client = None
@@ -1039,7 +1150,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Sleep and wait for the query to be cancelled. The cancellation will
# set the state to EXCEPTION.
start_time = time()
- while (client.get_state(self.query_handle) != \
+ while (client.get_state(self.query_handle) !=
client.QUERY_STATES['EXCEPTION']):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
@@ -1060,7 +1171,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
admission control."""
for impalad in self.impalads:
queries_json = impalad.service.get_debug_webpage_json('/queries')
- for query in itertools.chain(queries_json['in_flight_queries'], \
+ for query in itertools.chain(queries_json['in_flight_queries'],
queries_json['completed_queries']):
if query['stmt_type'] == 'QUERY' or query['stmt_type'] == 'DML':
assert query['last_event'] != 'Registered' and \
@@ -1090,8 +1201,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
num_queries = vector.get_value('num_queries')
assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
- initial_metrics = self.get_admission_metrics();
- log_metrics("Initial metrics: ", initial_metrics);
+ initial_metrics = self.get_admission_metrics()
+ log_metrics("Initial metrics: ", initial_metrics)
for query_num in xrange(num_queries):
impalad = self.impalads[query_num % len(self.impalads)]
@@ -1109,7 +1220,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# without this thread explicitly ending them, so that the test can admit queries in
# discrete waves.
LOG.info("Wait for initial admission decisions")
- (metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
+ (metric_deltas, curr_metrics) = self.wait_for_metric_changes(
['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
# Also wait for the test threads that submitted the queries to start executing.
self.wait_for_admitted_threads(metric_deltas['admitted'])
@@ -1129,7 +1240,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
assert metric_deltas['queued'] <= MAX_NUM_QUEUED_QUERIES * len(self.impalads),\
"Queued too many queries: at least one daemon queued too many"
assert metric_deltas['rejected'] + metric_deltas['admitted'] +\
- metric_deltas['queued'] == num_queries ,\
+ metric_deltas['queued'] == num_queries,\
"Initial admission decisions don't add up to {0}: {1}".format(
num_queries, str(metric_deltas))
initial_metric_deltas = metric_deltas
@@ -1144,8 +1255,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Admit queries in waves until all queries are done. A new wave of admission
# is started by killing some of the running queries.
while len(self.executing_threads) > 0:
- curr_metrics = self.get_consistent_admission_metrics(num_queries);
- log_metrics("Main loop, curr_metrics: ", curr_metrics);
+ curr_metrics = self.get_consistent_admission_metrics(num_queries)
+ log_metrics("Main loop, curr_metrics: ", curr_metrics)
num_to_end = len(self.executing_threads)
LOG.info("Main loop, will request %s queries to end", num_to_end)
self.end_admitted_queries(num_to_end)
@@ -1166,8 +1277,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# state or we may find an impalad dequeue more requests after we capture metrics.
self.wait_for_statestore_updates(10)
- final_metrics = self.get_consistent_admission_metrics(num_queries);
- log_metrics("Final metrics: ", final_metrics);
+ final_metrics = self.get_consistent_admission_metrics(num_queries)
+ log_metrics("Final metrics: ", final_metrics)
metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
assert metric_deltas['timed-out'] == 0
@@ -1212,8 +1323,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
- impalad_args=impalad_admission_ctrl_config_args(),
- statestored_args=_STATESTORED_ARGS)
+ impalad_args=impalad_admission_ctrl_config_args(
+ fs_allocation_file="fair-scheduler-test2.xml",
+ llama_site_file="llama-site-test2.xml"),
+ statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_configs(self, vector):
self.pool_name = 'root.queueB'
self.run_admission_test(vector, {'request_pool': self.pool_name})
@@ -1241,7 +1354,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# settings of the OS. It should be fine to continue anyway.
proc_limit = self.get_proc_limit()
if proc_limit != MEM_TEST_LIMIT:
- LOG.info("Warning: Process mem limit %s is not expected val %s", limit_val,
+ LOG.info("Warning: Process mem limit %s is not expected val %s", proc_limit,
MEM_TEST_LIMIT)
self.pool_name = 'default-pool'