You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2023/03/23 16:20:41 UTC
[impala] 02/03: IMPALA-12017: Skip memory and cpu limit check if REQUEST_POOL is set
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit f2b01c1ddb7d5f002d07fdda12afe9300ac316e8
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Mar 22 11:50:35 2023 -0700
IMPALA-12017: Skip memory and cpu limit check if REQUEST_POOL is set
Memory and cpu limit checking in executor group
selection (Frontend.java) should be skipped if REQUEST_POOL query option
is set. Setting REQUEST_POOL means user is specifying pool to run the
query regardless of memory and cpu limit.
Testing:
- Add test cases in test_query_cpu_count_divisor_default
Change-Id: I14bf7fe71e2dda1099651b3edf62480e1fdbf845
Reviewed-on: http://gerrit.cloudera.org:8080/19645
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Riza Suminto <ri...@cloudera.com>
---
.../java/org/apache/impala/service/Frontend.java | 21 ++++--
tests/custom_cluster/test_executor_groups.py | 74 +++++++++++++++++-----
2 files changed, 75 insertions(+), 20 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 28f097f3a..e59bf90bc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2059,6 +2059,7 @@ public class Frontend {
}
// Counters about this group set.
+ int available_cores = expectedTotalCores(group_set);
String profileName = "Executor group " + (i + 1);
if (group_set.isSetExec_group_name_prefix()
&& !group_set.getExec_group_name_prefix().isEmpty()) {
@@ -2067,6 +2068,7 @@ public class Frontend {
TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName);
addCounter(groupSetProfile,
new TCounter(MEMORY_MAX, TUnit.BYTES, group_set.getMax_mem_limit()));
+ addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores));
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
// Find out the per host memory estimated from two possible sources.
@@ -2089,28 +2091,38 @@ public class Frontend {
boolean cpuReqSatisfied = true;
int scaled_cores_requirement = -1;
- int available_cores = -1;
if (ProcessingCost.isComputeCost(queryOptions)) {
Preconditions.checkState(cores_requirement > 0);
scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE,
Math.ceil(
cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
- available_cores = expectedTotalCores(group_set);
cpuReqSatisfied = scaled_cores_requirement <= available_cores;
- addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores));
addCounter(
groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement));
addCounter(groupSetProfile,
new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement));
}
- if (memReqSatisfied && cpuReqSatisfied) {
+ boolean matchFound = false;
+ if (queryOptions.isSetRequest_pool()) {
+ if (!default_executor_group) {
+ Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
+ queryOptions.getRequest_pool()));
+ }
+ reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool()
+ + " is set. Memory and cpu limit checking is skipped.";
+ addInfoString(groupSetProfile, VERDICT, reason);
+ matchFound = true;
+ } else if (memReqSatisfied && cpuReqSatisfied) {
reason = "suitable group found (estimated per-host memory="
+ PrintUtils.printBytes(per_host_mem_estimate)
+ ", estimated cpu cores required=" + cores_requirement
+ ", scaled cpu cores=" + scaled_cores_requirement + ")";
addInfoString(groupSetProfile, VERDICT, "Match");
+ matchFound = true;
+ }
+ if (matchFound) {
// Set the group name prefix in both the returned query options and
// the query context for non default group setup.
if (!default_executor_group) {
@@ -2120,7 +2132,6 @@ public class Frontend {
req.query_exec_request.query_ctx.setRequest_pool(namePrefix);
}
}
-
break;
}
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 7df6c27bb..4ae476e81 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -22,6 +22,7 @@ from builtins import range
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.util.concurrent_workload import ConcurrentWorkload
+import copy
import json
import logging
import os
@@ -33,9 +34,12 @@ LOG = logging.getLogger("test_auto_scaling")
# Non-trivial query that gets scheduled on all executors within a group.
TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3"
-# A query to test Cpu requirement. Estimated memory per host is 37MB.
+# A query to test CPU requirement. Estimated memory per host is 37MB.
CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"
+# Default query option to use for testing CPU requirement.
+CPU_DOP_OPTIONS = {'MT_DOP': '2', 'COMPUTE_PROCESSING_COST': 'true'}
+
DEFAULT_RESOURCE_POOL = "default-pool"
@@ -785,16 +789,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
result = self.execute_query_expect_success(self.client, LARGE_QUERY)
assert "Executor Group: root.large-group" in str(result.runtime_profile)
- # Force to run the large query on the small group should fail
+ # Force to run the large query on the small group.
+ # Query should run successfully since exec group memory limit is ignored.
self.client.set_configuration({'request_pool': 'small'})
- result = self.execute_query_expect_failure(self.client, LARGE_QUERY)
- assert ("The query does not fit largest executor group sets. "
- "Reason: not enough per-host memory") in str(result)
+ result = self.execute_query_expect_success(self.client, LARGE_QUERY)
+ assert ("Verdict: query option REQUEST_POOL=small is set. "
+ "Memory and cpu limit checking is skipped.") in str(result.runtime_profile)
self.client.close()
- def _run_with_compute_processing_cost(self, coordinator_test_args, TEST_QUERY,
- expected_strings_in_profile):
+ def _setup_three_exec_group_cluster(self, coordinator_test_args):
# The path to resources directory which contains the admission control config files.
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
"resources")
@@ -840,37 +844,77 @@ class TestExecutorGroups(CustomClusterTestSuite):
assert self._get_num_executor_groups(only_healthy=True,
exec_group_set_prefix="root.large") == 1
- # assert that 'expected_profile' exist in query profile
- self.execute_query_expect_success(self.client, 'SET MT_DOP=2;')
- self.execute_query_expect_success(self.client, 'SET COMPUTE_PROCESSING_COST=1;')
- result = self.execute_query_expect_success(self.client, TEST_QUERY)
+ def _run_query_and_verify_profile(self, query, query_options,
+ expected_strings_in_profile, not_expected_in_profile=[]):
+ """Run 'query' with given 'query_options'. Assert existence of
+ 'expected_strings_in_profile' and nonexistence of 'not_expected_in_profile'
+ in query profile.
+ Caller is reponsible to close self.client at the end of test."""
+ for k, v in query_options.items():
+ self.execute_query_expect_success(self.client, "SET {}='{}';".format(k, v))
+ result = self.execute_query_expect_success(self.client, query)
for expected_profile in expected_strings_in_profile:
assert expected_profile in str(result.runtime_profile)
- self.client.close()
+ for not_expected in not_expected_in_profile:
+ assert not_expected not in str(result.runtime_profile)
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_default(self):
# Expect to run the query on the small group by default.
coordinator_test_args = ""
- self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+ self._setup_three_exec_group_cluster(coordinator_test_args)
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
["Executor Group: root.small-group", "EffectiveParallelism: 5",
"ExecutorGroupsConsidered: 2"])
+ # Test disabling COMPUTE_PROCESING_COST and not setting REQUEST_POOL
+ options = copy.deepcopy(CPU_DOP_OPTIONS)
+ options['COMPUTE_PROCESSING_COST'] = 'false'
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+ ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
+ "Verdict: Match"],
+ ["EffectiveParallelism:", "CpuAsk:"])
+
+ # Test that REQUEST_POOL will override executor group selection
+ options['COMPUTE_PROCESSING_COST'] = 'true'
+ options['REQUEST_POOL'] = 'root.large'
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+ ["Executor Group: root.large-group",
+ ("Verdict: query option REQUEST_POOL=root.large is set. "
+ "Memory and cpu limit checking is skipped."),
+ "EffectiveParallelism: 7", "ExecutorGroupsConsidered: 1"])
+
+ # Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST
+ options['COMPUTE_PROCESSING_COST'] = 'false'
+ options['REQUEST_POOL'] = 'root.large'
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+ ["Executor Group: root.large-group",
+ ("Verdict: query option REQUEST_POOL=root.large is set. "
+ "Memory and cpu limit checking is skipped."),
+ "ExecutorGroupsConsidered: 1"],
+ ["EffectiveParallelism:", "CpuAsk:"])
+
+ self.client.close()
+
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_two(self):
# Expect to run the query on the tiny group
coordinator_test_args = "-query_cpu_count_divisor=2 "
- self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+ self._setup_three_exec_group_cluster(coordinator_test_args)
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
["Executor Group: root.tiny-group", "EffectiveParallelism: 3",
"ExecutorGroupsConsidered: 1"])
+ self.client.close()
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_fraction(self):
# Expect to run the query on the large group
coordinator_test_args = "-query_cpu_count_divisor=0.2 "
- self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+ self._setup_three_exec_group_cluster(coordinator_test_args)
+ self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
["Executor Group: root.large-group", "EffectiveParallelism: 7",
"ExecutorGroupsConsidered: 3"])
+ self.client.close()
@pytest.mark.execute_serially
def test_per_exec_group_set_metrics(self):