You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2023/07/14 09:20:52 UTC

[impala] 01/03: IMPALA-12281: Disallow unsetting REQUEST_POOL if it is set by client

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c9aed342f54494d2f0b5e5aada472d6af6697adc
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Jul 12 15:20:56 2023 -0700

    IMPALA-12281: Disallow unsetting REQUEST_POOL if it is set by client
    
    IMPALA-12056 enable child query to unset REQUEST_POOL if it is set by
    Frontend.java as part of executor group selection. However, the
    implementation miss to setRequest_pool_set_by_frontend(false) if
    REQUEST_POOL is explicitly set by client request through impala-shell
    configuration. This cause child query to always unset REQUEST_POOL if
    parent query was executed via impala-shell. This patch fix the issue by
    checking query options that comes from client.
    
    This patch also tidy up null and empty REQUEST_POOL checking by using
    StringUtils.isNotEmpty().
    
    Testing:
    - Add testcase in test_query_cpu_count_divisor_default
    
    Change-Id: Ib5036859d51bc64f568da405f730c8f3ffebb742
    Reviewed-on: http://gerrit.cloudera.org:8080/20189
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Riza Suminto <ri...@cloudera.com>
---
 .../java/org/apache/impala/service/Frontend.java   | 22 +++++++----
 tests/custom_cluster/test_executor_groups.py       | 45 +++++++++++++++++-----
 2 files changed, 50 insertions(+), 17 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 18365487d..b2124a1a4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -1943,7 +1944,8 @@ public class Frontend {
       // If defined, request_pool can be a suffix of the group name prefix. For example
       //   group_set_prefix = root.queue1
       //   request_pool = queue1
-      if (request_pool != null && !e.getExec_group_name_prefix().endsWith(request_pool)) {
+      if (StringUtils.isNotEmpty(request_pool)
+          && !e.getExec_group_name_prefix().endsWith(request_pool)) {
         continue;
       }
       TExecutorGroupSet new_entry = new TExecutorGroupSet(e);
@@ -1965,7 +1967,8 @@ public class Frontend {
       }
       result.add(new_entry);
     }
-    if (executorGroupSets.size() > 0 && result.size() == 0 && request_pool != null) {
+    if (executorGroupSets.size() > 0 && result.size() == 0
+        && StringUtils.isNotEmpty(request_pool)) {
       throw new AnalysisException("Request pool: " + request_pool
           + " does not map to any known executor group set.");
     }
@@ -2018,6 +2021,9 @@ public class Frontend {
 
     TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
     boolean enable_replan = queryOptions.isEnable_replan();
+    final boolean clientSetRequestPool = queryOptions.isSetRequest_pool();
+    Preconditions.checkState(
+        !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());
 
     List<TExecutorGroupSet> originalExecutorGroupSets =
         ExecutorMembershipSnapshot.getAllExecutorGroupSets();
@@ -2131,7 +2137,7 @@ public class Frontend {
       }
 
       if (notScalable) {
-        setGroupNamePrefix(default_executor_group, req, group_set);
+        setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
         addInfoString(
             groupSetProfile, VERDICT, "Assign to first group because " + reason);
         FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
@@ -2187,7 +2193,7 @@ public class Frontend {
       }
 
       boolean matchFound = false;
-      if (queryOptions.isSetRequest_pool()) {
+      if (clientSetRequestPool) {
         if (!default_executor_group) {
           Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
               queryOptions.getRequest_pool()));
@@ -2216,7 +2222,7 @@ public class Frontend {
       FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
 
       if (matchFound) {
-        setGroupNamePrefix(default_executor_group, req, group_set);
+        setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
         break;
       }
 
@@ -2267,14 +2273,14 @@ public class Frontend {
     return req;
   }
 
-  private static void setGroupNamePrefix(
-      boolean default_executor_group, TExecRequest req, TExecutorGroupSet group_set) {
+  private static void setGroupNamePrefix(boolean default_executor_group,
+      boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) {
     // Set the group name prefix in both the returned query options and
     // the query context for non default group setup.
     if (!default_executor_group) {
       String namePrefix = group_set.getExec_group_name_prefix();
       req.query_options.setRequest_pool(namePrefix);
-      req.setRequest_pool_set_by_frontend(true);
+      req.setRequest_pool_set_by_frontend(!clientSetRequestPool);
       if (req.query_exec_request != null) {
         req.query_exec_request.query_ctx.setRequest_pool(namePrefix);
       }
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index c42c06c15..dd8693d47 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -870,7 +870,9 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                          exec_group_set_prefix="root.large") == 1
 
   def _set_query_options(self, query_options):
-    """Set query options"""
+    """Set query options by running it as an SQL statement.
+    To mimic impala-shell behavior, use self.client.set_configuration() instead.
+    """
     for k, v in query_options.items():
       self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v))
 
@@ -904,9 +906,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
   @UniqueDatabase.parametrize(sync_ddl=True)
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_default(self, unique_database):
-    # Expect to run the query on the small group by default.
     coordinator_test_args = ""
     self._setup_three_exec_group_cluster(coordinator_test_args)
+    self.client.clear_configuration()
+
+    # Expect to run the query on the small group by default.
     self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.small-group", "EffectiveParallelism: 11",
@@ -957,22 +961,36 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._run_query_and_verify_profile(compute_stats_query,
         ["ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
-        ["Executor Group:"])
+        ["Query Options (set by configuration): REQUEST_POOL=",
+         "Executor Group:"])
     self._verify_total_admitted_queries("root.small", 4)
     self._verify_total_admitted_queries("root.large", 2)
 
-    # Test that child queries follow REQUEST_POOL that was set by client.
+    # Test that child queries follow REQUEST_POOL that is set through client
+    # configuration. Two child queries should all run in root.small.
+    self.client.set_configuration({'REQUEST_POOL': 'root.small'})
+    self._run_query_and_verify_profile(compute_stats_query,
+        ["Query Options (set by configuration): REQUEST_POOL=root.small",
+         "ExecutorGroupsConsidered: 1",
+         "Verdict: Assign to first group because query is not auto-scalable"],
+        ["Executor Group:"])
+    self._verify_total_admitted_queries("root.small", 6)
+    self.client.clear_configuration()
+
+    # Test that child queries follow REQUEST_POOL that is set through SQL statement.
     # Two child queries should all run in root.large.
     self._set_query_options({'REQUEST_POOL': 'root.large'})
     self._run_query_and_verify_profile(compute_stats_query,
-        ["ExecutorGroupsConsidered: 1",
+        ["Query Options (set by configuration): REQUEST_POOL=root.large",
+         "ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
         ["Executor Group:"])
     self._verify_total_admitted_queries("root.large", 4)
 
     # Test that REQUEST_POOL will override executor group selection
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.large-group",
+        ["Query Options (set by configuration): REQUEST_POOL=root.large",
+         "Executor Group: root.large-group",
          ("Verdict: query option REQUEST_POOL=root.large is set. "
           "Memory and cpu limit checking is skipped."),
          "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
@@ -982,7 +1000,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       'COMPUTE_PROCESSING_COST': 'false',
       'REQUEST_POOL': 'root.large'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.large-group",
+        ["Query Options (set by configuration): REQUEST_POOL=root.large",
+         "Executor Group: root.large-group",
          ("Verdict: query option REQUEST_POOL=root.large is set. "
           "Memory and cpu limit checking is skipped."),
          "ExecutorGroupsConsidered: 1"],
@@ -993,6 +1012,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
       'REQUEST_POOL': '',
       'COMPUTE_PROCESSING_COST': 'true'})
 
+    # Test that empty REQUEST_POOL should have no impact.
+    self.client.set_configuration({'REQUEST_POOL': ''})
+    self._run_query_and_verify_profile(CPU_TEST_QUERY,
+        ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
+         "Verdict: Match"],
+        ["Query Options (set by configuration): REQUEST_POOL="])
+    self.client.clear_configuration()
+
     # Test that GROUPING_TEST_QUERY will get assigned to the large group.
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
@@ -1160,10 +1187,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # END testing insert + MAX_FS_WRITER
 
     # Check resource pools on the Web queries site and admission site
-    self._verify_query_num_for_resource_pool("root.small", 4)
+    self._verify_query_num_for_resource_pool("root.small", 7)
     self._verify_query_num_for_resource_pool("root.tiny", 4)
     self._verify_query_num_for_resource_pool("root.large", 12)
-    self._verify_total_admitted_queries("root.small", 5)
+    self._verify_total_admitted_queries("root.small", 8)
     self._verify_total_admitted_queries("root.tiny", 6)
     self._verify_total_admitted_queries("root.large", 16)