You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 18:42:02 UTC

(impala) 01/03: IMPALA-12654: Add query option QUERY_CPU_COUNT_DIVISOR

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ac8ffa9125fc3be6f4ceb958ea4930b7834f9292
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Dec 18 15:47:27 2023 -0800

    IMPALA-12654: Add query option QUERY_CPU_COUNT_DIVISOR
    
    IMPALA-11604 adds a hidden backend flag named query_cpu_count_divisor to
    allow oversubscribing CPU cores more than what is available in the
    executor group set. This patch adds a query option with the same name
    and function so that CPU core matching can be tuned for individual
    queries. The query option takes precedence over the flag.
    
    Testing:
    - Add test case in test_executor_groups.py and query-options-test.cc
    
    Change-Id: I34ab47bd67509a02790c3caedb3fde4d1b6eaa78
    Reviewed-on: http://gerrit.cloudera.org:8080/20819
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-option-parser.h               | 18 +++++++++++++++
 be/src/service/query-options-test.cc               | 17 ++++++++++++++
 be/src/service/query-options.cc                    |  7 ++++++
 be/src/service/query-options.h                     |  4 +++-
 be/src/util/backend-gflag-util.cc                  |  1 +
 common/thrift/ImpalaService.thrift                 |  9 ++++++++
 common/thrift/Query.thrift                         |  3 +++
 .../java/org/apache/impala/service/Frontend.java   | 11 +++++----
 tests/custom_cluster/test_executor_groups.py       | 26 +++++++++++++++++++---
 9 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/be/src/service/query-option-parser.h b/be/src/service/query-option-parser.h
index b87246668..4d31ae416 100644
--- a/be/src/service/query-option-parser.h
+++ b/be/src/service/query-option-parser.h
@@ -69,6 +69,16 @@ class QueryOptionValidator {
     return Status::OK();
   }
 
+  static inline Status ExclusiveLowerBound(
+      TImpalaQueryOptions::type option, const T value, const T lower) {
+    if (value <= lower) {
+      std::stringstream ss;
+      ss << "Value must be greater than " << lower << ", actual value: " << value;
+      return CreateValidationErrorStatus(option, ss.str());
+    }
+    return Status::OK();
+  }
+
   static inline Status NotEquals(
       TImpalaQueryOptions::type option, const T value, const T other) {
     if (value == other) {
@@ -156,6 +166,14 @@ class QueryOptionParser {
     return QueryOptionValidator<T>::InclusiveLowerBound(option, *result, lower);
   }
 
+  template <typename T>
+  static Status ParseAndCheckExclusiveLowerBound(TImpalaQueryOptions::type option,
+      const std::string& value, const T lower, T* result) {
+    Status status = Parse(option, value, result);
+    RETURN_IF_ERROR(status);
+    return QueryOptionValidator<T>::ExclusiveLowerBound(option, *result, lower);
+  }
+
   template <typename T>
   static Status ParseAndCheckNonNegative(
       TImpalaQueryOptions::type option, const std::string& value, T* result) {
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 66cea4991..75bd1867a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -428,6 +428,23 @@ TEST(QueryOptions, SetSpecialOptions) {
     TestError("8191"); // default value of FLAGS_min_buffer_size is 8KB
     TestOk("64KB", 64 * 1024);
   }
+  // QUERY_CPU_COUNT_DIVISOR should be greater than 0.0.
+  {
+    OptionDef<double> key_def = MAKE_OPTIONDEF(query_cpu_count_divisor);
+    auto TestOk = MakeTestOkFn(options, key_def);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestOk("0.5", 0.5);
+    TestOk("0.0000000001", 0.0000000001);
+    TestOk("0.999999999", 0.999999999);
+    TestOk(" 0.9", 0.9);
+    TestOk("1", 1.0);
+    TestOk("1.1", 1.1);
+    TestOk("1000.00", 1000.0);
+    TestError("0");
+    TestError("-1");
+    TestError("-0.1");
+    TestError("Not a number!");
+  }
 }
 
 TEST(QueryOptions, ParseQueryOptions) {
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ec6dfce5e..0cf88b904 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1195,6 +1195,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_num_filters_aggregated_per_host(int32_t_val);
         break;
       }
+      case TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR: {
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckExclusiveLowerBound<double>(
+            option, value, 0.0, &double_val));
+        query_options->__set_query_cpu_count_divisor(double_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index d31116d3f..defa0dd7f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::MAX_NUM_FILTERS_AGGREGATED_PER_HOST + 1);                     \
+      TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR + 1);                                 \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -321,6 +321,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE, TQueryOptionLevel::DEVELOPMENT)        \
   QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \
       TQueryOptionLevel::DEVELOPMENT)                                                    \
+  QUERY_OPT_FN(query_cpu_count_divisor,                                                  \
+      QUERY_CPU_COUNT_DIVISOR, TQueryOptionLevel::ADVANCED)                              \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 8c349aac3..64f2fe0be 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -210,6 +210,7 @@ DEFINE_string(ignored_dir_prefix_list, ".,_tmp.,_spark_metadata",
     " skip in loading file metadata.");
 
 DEFINE_double_hidden(query_cpu_count_divisor, 1.0,
+    "(Deprecated) this is now deprecated in favor of query option with the same name. "
     "(Advance) Divide the CPU requirement of a query to fit the total available CPU in "
     "the executor group. For example, setting value 2 will fit the query with CPU "
     "requirement 2X to an executor group with total available CPU X. Note that setting "
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3276c2785..d308463dc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -907,6 +907,15 @@ enum TImpalaQueryOptions {
   // ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting 1, 0, or negative value
   // will disable the intermediate aggregator feature. Default to -1 (disabled).
   MAX_NUM_FILTERS_AGGREGATED_PER_HOST = 172
+
+  // Divide the CPU requirement of a query to fit the total available CPU in
+  // the executor group. For example, setting value 2 will fit the query with CPU
+  // requirement 2X to an executor group with total available CPU X. Note that setting
+  // with a fractional value less than 1 effectively multiplies the query CPU
+  // requirement. A valid value is > 0.0.
+  // If this query option is not set, value of backend flag --query_cpu_count_divisor
+  // (default to 1.0) will be picked up instead.
+  QUERY_CPU_COUNT_DIVISOR = 173
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 794301257..16d348e6e 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -693,6 +693,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   173: optional i32 max_num_filters_aggregated_per_host = -1
+
+  // See comment in ImpalaService.thrift
+  174: optional double query_cpu_count_divisor
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 7be0861a1..31f9a570d 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2080,9 +2080,13 @@ public class Frontend {
     planCtx.compilationState_.captureState();
     boolean isComputeCost = queryOptions.isCompute_processing_cost();
 
+    double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor();
     if (isComputeCost) {
+      if (queryOptions.isSetQuery_cpu_count_divisor()) {
+        cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor();
+      }
       FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE,
-          Double.doubleToLongBits(BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
+          Double.doubleToLongBits(cpuCountDivisor));
     }
 
     TExecutorGroupSet group_set = null;
@@ -2199,9 +2203,8 @@ public class Frontend {
               + queryOptions.getMax_fragment_instances_per_node() + ").");
         }
 
-        scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE,
-            Math.ceil(
-                cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
+        scaled_cores_requirement = (int) Math.min(
+            Integer.MAX_VALUE, Math.ceil(cores_requirement / cpuCountDivisor));
         cpuReqSatisfied = scaled_cores_requirement <= available_cores;
 
         addCounter(
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 6718e4572..e7a9e382c 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -1221,11 +1221,31 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.small-group",
          "CpuAsk: 6", "EffectiveParallelism: 11",
-         "ExecutorGroupsConsidered: 2"])
+         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+
+    # Test that QUERY_CPU_COUNT_DIVISOR option can override
+    # query_cpu_count_divisor flag.
+    self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'})
+    self._run_query_and_verify_profile(CPU_TEST_QUERY,
+        ["Executor Group: root.small-group",
+         "CpuAsk: 11", "EffectiveParallelism: 11",
+         "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"])
+    self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'})
+    self._run_query_and_verify_profile(CPU_TEST_QUERY,
+        ["Executor Group: root.large-group",
+         "CpuAsk: 22", "EffectiveParallelism: 11",
+         "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"])
+    self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'})
+    self._run_query_and_verify_profile(CPU_TEST_QUERY,
+        ["Executor Group: root.small-group",
+         "CpuAsk: 6", "EffectiveParallelism: 11",
+         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
 
     # Check resource pools on the Web queries site and admission site
-    self._verify_query_num_for_resource_pool("root.small", 1)
-    self._verify_total_admitted_queries("root.small", 1)
+    self._verify_query_num_for_resource_pool("root.small", 3)
+    self._verify_query_num_for_resource_pool("root.large", 1)
+    self._verify_total_admitted_queries("root.small", 3)
+    self._verify_total_admitted_queries("root.large", 1)
 
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_fraction(self):