You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2023/07/14 09:20:52 UTC
[impala] 01/03: IMPALA-12281: Disallow unsetting REQUEST_POOL if it is set by client
This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit c9aed342f54494d2f0b5e5aada472d6af6697adc
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Jul 12 15:20:56 2023 -0700
IMPALA-12281: Disallow unsetting REQUEST_POOL if it is set by client
IMPALA-12056 enable child query to unset REQUEST_POOL if it is set by
Frontend.java as part of executor group selection. However, the
implementation miss to setRequest_pool_set_by_frontend(false) if
REQUEST_POOL is explicitly set by client request through impala-shell
configuration. This cause child query to always unset REQUEST_POOL if
parent query was executed via impala-shell. This patch fix the issue by
checking query options that comes from client.
This patch also tidy up null and empty REQUEST_POOL checking by using
StringUtils.isNotEmpty().
Testing:
- Add testcase in test_query_cpu_count_divisor_default
Change-Id: Ib5036859d51bc64f568da405f730c8f3ffebb742
Reviewed-on: http://gerrit.cloudera.org:8080/20189
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Riza Suminto <ri...@cloudera.com>
---
.../java/org/apache/impala/service/Frontend.java | 22 +++++++----
tests/custom_cluster/test_executor_groups.py | 45 +++++++++++++++++-----
2 files changed, 50 insertions(+), 17 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 18365487d..b2124a1a4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -1943,7 +1944,8 @@ public class Frontend {
// If defined, request_pool can be a suffix of the group name prefix. For example
// group_set_prefix = root.queue1
// request_pool = queue1
- if (request_pool != null && !e.getExec_group_name_prefix().endsWith(request_pool)) {
+ if (StringUtils.isNotEmpty(request_pool)
+ && !e.getExec_group_name_prefix().endsWith(request_pool)) {
continue;
}
TExecutorGroupSet new_entry = new TExecutorGroupSet(e);
@@ -1965,7 +1967,8 @@ public class Frontend {
}
result.add(new_entry);
}
- if (executorGroupSets.size() > 0 && result.size() == 0 && request_pool != null) {
+ if (executorGroupSets.size() > 0 && result.size() == 0
+ && StringUtils.isNotEmpty(request_pool)) {
throw new AnalysisException("Request pool: " + request_pool
+ " does not map to any known executor group set.");
}
@@ -2018,6 +2021,9 @@ public class Frontend {
TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
boolean enable_replan = queryOptions.isEnable_replan();
+ final boolean clientSetRequestPool = queryOptions.isSetRequest_pool();
+ Preconditions.checkState(
+ !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());
List<TExecutorGroupSet> originalExecutorGroupSets =
ExecutorMembershipSnapshot.getAllExecutorGroupSets();
@@ -2131,7 +2137,7 @@ public class Frontend {
}
if (notScalable) {
- setGroupNamePrefix(default_executor_group, req, group_set);
+ setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
addInfoString(
groupSetProfile, VERDICT, "Assign to first group because " + reason);
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
@@ -2187,7 +2193,7 @@ public class Frontend {
}
boolean matchFound = false;
- if (queryOptions.isSetRequest_pool()) {
+ if (clientSetRequestPool) {
if (!default_executor_group) {
Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
queryOptions.getRequest_pool()));
@@ -2216,7 +2222,7 @@ public class Frontend {
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
if (matchFound) {
- setGroupNamePrefix(default_executor_group, req, group_set);
+ setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
break;
}
@@ -2267,14 +2273,14 @@ public class Frontend {
return req;
}
- private static void setGroupNamePrefix(
- boolean default_executor_group, TExecRequest req, TExecutorGroupSet group_set) {
+ private static void setGroupNamePrefix(boolean default_executor_group,
+ boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) {
// Set the group name prefix in both the returned query options and
// the query context for non default group setup.
if (!default_executor_group) {
String namePrefix = group_set.getExec_group_name_prefix();
req.query_options.setRequest_pool(namePrefix);
- req.setRequest_pool_set_by_frontend(true);
+ req.setRequest_pool_set_by_frontend(!clientSetRequestPool);
if (req.query_exec_request != null) {
req.query_exec_request.query_ctx.setRequest_pool(namePrefix);
}
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index c42c06c15..dd8693d47 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -870,7 +870,9 @@ class TestExecutorGroups(CustomClusterTestSuite):
exec_group_set_prefix="root.large") == 1
def _set_query_options(self, query_options):
- """Set query options"""
+ """Set query options by running it as an SQL statement.
+ To mimic impala-shell behavior, use self.client.set_configuration() instead.
+ """
for k, v in query_options.items():
self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v))
@@ -904,9 +906,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
@UniqueDatabase.parametrize(sync_ddl=True)
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_default(self, unique_database):
- # Expect to run the query on the small group by default.
coordinator_test_args = ""
self._setup_three_exec_group_cluster(coordinator_test_args)
+ self.client.clear_configuration()
+
+ # Expect to run the query on the small group by default.
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.small-group", "EffectiveParallelism: 11",
@@ -957,22 +961,36 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._run_query_and_verify_profile(compute_stats_query,
["ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
- ["Executor Group:"])
+ ["Query Options (set by configuration): REQUEST_POOL=",
+ "Executor Group:"])
self._verify_total_admitted_queries("root.small", 4)
self._verify_total_admitted_queries("root.large", 2)
- # Test that child queries follow REQUEST_POOL that was set by client.
+ # Test that child queries follow REQUEST_POOL that is set through client
+ # configuration. Two child queries should all run in root.small.
+ self.client.set_configuration({'REQUEST_POOL': 'root.small'})
+ self._run_query_and_verify_profile(compute_stats_query,
+ ["Query Options (set by configuration): REQUEST_POOL=root.small",
+ "ExecutorGroupsConsidered: 1",
+ "Verdict: Assign to first group because query is not auto-scalable"],
+ ["Executor Group:"])
+ self._verify_total_admitted_queries("root.small", 6)
+ self.client.clear_configuration()
+
+ # Test that child queries follow REQUEST_POOL that is set through SQL statement.
# Two child queries should all run in root.large.
self._set_query_options({'REQUEST_POOL': 'root.large'})
self._run_query_and_verify_profile(compute_stats_query,
- ["ExecutorGroupsConsidered: 1",
+ ["Query Options (set by configuration): REQUEST_POOL=root.large",
+ "ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
["Executor Group:"])
self._verify_total_admitted_queries("root.large", 4)
# Test that REQUEST_POOL will override executor group selection
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.large-group",
+ ["Query Options (set by configuration): REQUEST_POOL=root.large",
+ "Executor Group: root.large-group",
("Verdict: query option REQUEST_POOL=root.large is set. "
"Memory and cpu limit checking is skipped."),
"EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
@@ -982,7 +1000,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
'COMPUTE_PROCESSING_COST': 'false',
'REQUEST_POOL': 'root.large'})
self._run_query_and_verify_profile(CPU_TEST_QUERY,
- ["Executor Group: root.large-group",
+ ["Query Options (set by configuration): REQUEST_POOL=root.large",
+ "Executor Group: root.large-group",
("Verdict: query option REQUEST_POOL=root.large is set. "
"Memory and cpu limit checking is skipped."),
"ExecutorGroupsConsidered: 1"],
@@ -993,6 +1012,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
'REQUEST_POOL': '',
'COMPUTE_PROCESSING_COST': 'true'})
+ # Test that empty REQUEST_POOL should have no impact.
+ self.client.set_configuration({'REQUEST_POOL': ''})
+ self._run_query_and_verify_profile(CPU_TEST_QUERY,
+ ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
+ "Verdict: Match"],
+ ["Query Options (set by configuration): REQUEST_POOL="])
+ self.client.clear_configuration()
+
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
@@ -1160,10 +1187,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
# END testing insert + MAX_FS_WRITER
# Check resource pools on the Web queries site and admission site
- self._verify_query_num_for_resource_pool("root.small", 4)
+ self._verify_query_num_for_resource_pool("root.small", 7)
self._verify_query_num_for_resource_pool("root.tiny", 4)
self._verify_query_num_for_resource_pool("root.large", 12)
- self._verify_total_admitted_queries("root.small", 5)
+ self._verify_total_admitted_queries("root.small", 8)
self._verify_total_admitted_queries("root.tiny", 6)
self._verify_total_admitted_queries("root.large", 16)