You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2023/03/23 16:20:41 UTC

[impala] 02/03: IMPALA-12017: Skip memory and cpu limit check if REQUEST_POOL is set

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f2b01c1ddb7d5f002d07fdda12afe9300ac316e8
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Mar 22 11:50:35 2023 -0700

    IMPALA-12017: Skip memory and cpu limit check if REQUEST_POOL is set
    
    Memory and cpu limit checking in executor group
    selection (Frontend.java) should be skipped if REQUEST_POOL query option
    is set. Setting REQUEST_POOL means user is specifying pool to run the
    query regardless of memory and cpu limit.
    
    Testing:
    - Add test cases in test_query_cpu_count_divisor_default
    
    Change-Id: I14bf7fe71e2dda1099651b3edf62480e1fdbf845
    Reviewed-on: http://gerrit.cloudera.org:8080/19645
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
---
 .../java/org/apache/impala/service/Frontend.java   | 21 ++++--
 tests/custom_cluster/test_executor_groups.py       | 74 +++++++++++++++++-----
 2 files changed, 75 insertions(+), 20 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 28f097f3a..e59bf90bc 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2059,6 +2059,7 @@ public class Frontend {
       }
 
       // Counters about this group set.
+      int available_cores = expectedTotalCores(group_set);
       String profileName = "Executor group " + (i + 1);
       if (group_set.isSetExec_group_name_prefix()
           && !group_set.getExec_group_name_prefix().isEmpty()) {
@@ -2067,6 +2068,7 @@ public class Frontend {
       TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName);
       addCounter(groupSetProfile,
           new TCounter(MEMORY_MAX, TUnit.BYTES, group_set.getMax_mem_limit()));
+      addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores));
       FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
 
       // Find out the per host memory estimated from two possible sources.
@@ -2089,28 +2091,38 @@ public class Frontend {
 
       boolean cpuReqSatisfied = true;
       int scaled_cores_requirement = -1;
-      int available_cores = -1;
       if (ProcessingCost.isComputeCost(queryOptions)) {
         Preconditions.checkState(cores_requirement > 0);
         scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE,
             Math.ceil(
                 cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
-        available_cores = expectedTotalCores(group_set);
         cpuReqSatisfied = scaled_cores_requirement <= available_cores;
-        addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores));
         addCounter(
             groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement));
         addCounter(groupSetProfile,
             new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement));
       }
 
-      if (memReqSatisfied && cpuReqSatisfied) {
+      boolean matchFound = false;
+      if (queryOptions.isSetRequest_pool()) {
+        if (!default_executor_group) {
+          Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
+              queryOptions.getRequest_pool()));
+        }
+        reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool()
+            + " is set. Memory and cpu limit checking is skipped.";
+        addInfoString(groupSetProfile, VERDICT, reason);
+        matchFound = true;
+      } else if (memReqSatisfied && cpuReqSatisfied) {
         reason = "suitable group found (estimated per-host memory="
             + PrintUtils.printBytes(per_host_mem_estimate)
             + ", estimated cpu cores required=" + cores_requirement
             + ", scaled cpu cores=" + scaled_cores_requirement + ")";
         addInfoString(groupSetProfile, VERDICT, "Match");
+        matchFound = true;
+      }
 
+      if (matchFound) {
         // Set the group name prefix in both the returned query options and
         // the query context for non default group setup.
         if (!default_executor_group) {
@@ -2120,7 +2132,6 @@ public class Frontend {
             req.query_exec_request.query_ctx.setRequest_pool(namePrefix);
           }
         }
-
         break;
       }
 
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 7df6c27bb..4ae476e81 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -22,6 +22,7 @@ from builtins import range
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.util.concurrent_workload import ConcurrentWorkload
 
+import copy
 import json
 import logging
 import os
@@ -33,9 +34,12 @@ LOG = logging.getLogger("test_auto_scaling")
 # Non-trivial query that gets scheduled on all executors within a group.
 TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3"
 
-# A query to test Cpu requirement. Estimated memory per host is 37MB.
+# A query to test CPU requirement. Estimated memory per host is 37MB.
 CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"
 
+# Default query option to use for testing CPU requirement.
+CPU_DOP_OPTIONS = {'MT_DOP': '2', 'COMPUTE_PROCESSING_COST': 'true'}
+
 DEFAULT_RESOURCE_POOL = "default-pool"
 
 
@@ -785,16 +789,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
     result = self.execute_query_expect_success(self.client, LARGE_QUERY)
     assert "Executor Group: root.large-group" in str(result.runtime_profile)
 
-    # Force to run the large query on the small group should fail
+    # Force to run the large query on the small group.
+    # Query should run successfully since exec group memory limit is ignored.
     self.client.set_configuration({'request_pool': 'small'})
-    result = self.execute_query_expect_failure(self.client, LARGE_QUERY)
-    assert ("The query does not fit largest executor group sets. "
-        "Reason: not enough per-host memory") in str(result)
+    result = self.execute_query_expect_success(self.client, LARGE_QUERY)
+    assert ("Verdict: query option REQUEST_POOL=small is set. "
+        "Memory and cpu limit checking is skipped.") in str(result.runtime_profile)
 
     self.client.close()
 
-  def _run_with_compute_processing_cost(self, coordinator_test_args, TEST_QUERY,
-      expected_strings_in_profile):
+  def _setup_three_exec_group_cluster(self, coordinator_test_args):
     # The path to resources directory which contains the admission control config files.
     RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
                                  "resources")
@@ -840,37 +844,77 @@ class TestExecutorGroups(CustomClusterTestSuite):
     assert self._get_num_executor_groups(only_healthy=True,
                                          exec_group_set_prefix="root.large") == 1
 
-    # assert that 'expected_profile' exist in query profile
-    self.execute_query_expect_success(self.client, 'SET MT_DOP=2;')
-    self.execute_query_expect_success(self.client, 'SET COMPUTE_PROCESSING_COST=1;')
-    result = self.execute_query_expect_success(self.client, TEST_QUERY)
+  def _run_query_and_verify_profile(self, query, query_options,
+      expected_strings_in_profile, not_expected_in_profile=[]):
+    """Run 'query' with given 'query_options'. Assert existence of
+    'expected_strings_in_profile' and nonexistence of 'not_expected_in_profile'
+    in query profile.
+    Caller is reponsible to close self.client at the end of test."""
+    for k, v in query_options.items():
+      self.execute_query_expect_success(self.client, "SET {}='{}';".format(k, v))
+    result = self.execute_query_expect_success(self.client, query)
     for expected_profile in expected_strings_in_profile:
       assert expected_profile in str(result.runtime_profile)
-    self.client.close()
+    for not_expected in not_expected_in_profile:
+      assert not_expected not in str(result.runtime_profile)
 
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_default(self):
     # Expect to run the query on the small group by default.
     coordinator_test_args = ""
-    self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+    self._setup_three_exec_group_cluster(coordinator_test_args)
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
         ["Executor Group: root.small-group", "EffectiveParallelism: 5",
          "ExecutorGroupsConsidered: 2"])
 
+    # Test disabling COMPUTE_PROCESING_COST and not setting REQUEST_POOL
+    options = copy.deepcopy(CPU_DOP_OPTIONS)
+    options['COMPUTE_PROCESSING_COST'] = 'false'
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+        ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
+         "Verdict: Match"],
+        ["EffectiveParallelism:", "CpuAsk:"])
+
+    # Test that REQUEST_POOL will override executor group selection
+    options['COMPUTE_PROCESSING_COST'] = 'true'
+    options['REQUEST_POOL'] = 'root.large'
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+        ["Executor Group: root.large-group",
+         ("Verdict: query option REQUEST_POOL=root.large is set. "
+          "Memory and cpu limit checking is skipped."),
+         "EffectiveParallelism: 7", "ExecutorGroupsConsidered: 1"])
+
+    # Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST
+    options['COMPUTE_PROCESSING_COST'] = 'false'
+    options['REQUEST_POOL'] = 'root.large'
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
+        ["Executor Group: root.large-group",
+         ("Verdict: query option REQUEST_POOL=root.large is set. "
+          "Memory and cpu limit checking is skipped."),
+         "ExecutorGroupsConsidered: 1"],
+        ["EffectiveParallelism:", "CpuAsk:"])
+
+    self.client.close()
+
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_two(self):
     # Expect to run the query on the tiny group
     coordinator_test_args = "-query_cpu_count_divisor=2 "
-    self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+    self._setup_three_exec_group_cluster(coordinator_test_args)
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
         ["Executor Group: root.tiny-group", "EffectiveParallelism: 3",
          "ExecutorGroupsConsidered: 1"])
+    self.client.close()
 
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_fraction(self):
     # Expect to run the query on the large group
     coordinator_test_args = "-query_cpu_count_divisor=0.2 "
-    self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY,
+    self._setup_three_exec_group_cluster(coordinator_test_args)
+    self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
         ["Executor Group: root.large-group", "EffectiveParallelism: 7",
          "ExecutorGroupsConsidered: 3"])
+    self.client.close()
 
   @pytest.mark.execute_serially
   def test_per_exec_group_set_metrics(self):