You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 18:42:02 UTC
(impala) 01/03: IMPALA-12654: Add query option QUERY_CPU_COUNT_DIVISOR
This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit ac8ffa9125fc3be6f4ceb958ea4930b7834f9292
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Dec 18 15:47:27 2023 -0800
IMPALA-12654: Add query option QUERY_CPU_COUNT_DIVISOR
IMPALA-11604 adds a hidden backend flag named query_cpu_count_divisor to
allow oversubscribing CPU cores more than what is available in the
executor group set. This patch adds a query option with the same name
and function so that CPU core matching can be tuned for individual
queries. The query option takes precedence over the flag.
Testing:
- Add test case in test_executor_groups.py and query-options-test.cc
Change-Id: I34ab47bd67509a02790c3caedb3fde4d1b6eaa78
Reviewed-on: http://gerrit.cloudera.org:8080/20819
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/service/query-option-parser.h | 18 +++++++++++++++
be/src/service/query-options-test.cc | 17 ++++++++++++++
be/src/service/query-options.cc | 7 ++++++
be/src/service/query-options.h | 4 +++-
be/src/util/backend-gflag-util.cc | 1 +
common/thrift/ImpalaService.thrift | 9 ++++++++
common/thrift/Query.thrift | 3 +++
.../java/org/apache/impala/service/Frontend.java | 11 +++++----
tests/custom_cluster/test_executor_groups.py | 26 +++++++++++++++++++---
9 files changed, 88 insertions(+), 8 deletions(-)
diff --git a/be/src/service/query-option-parser.h b/be/src/service/query-option-parser.h
index b87246668..4d31ae416 100644
--- a/be/src/service/query-option-parser.h
+++ b/be/src/service/query-option-parser.h
@@ -69,6 +69,16 @@ class QueryOptionValidator {
return Status::OK();
}
+ static inline Status ExclusiveLowerBound(
+ TImpalaQueryOptions::type option, const T value, const T lower) {
+ if (value <= lower) {
+ std::stringstream ss;
+ ss << "Value must be greater than " << lower << ", actual value: " << value;
+ return CreateValidationErrorStatus(option, ss.str());
+ }
+ return Status::OK();
+ }
+
static inline Status NotEquals(
TImpalaQueryOptions::type option, const T value, const T other) {
if (value == other) {
@@ -156,6 +166,14 @@ class QueryOptionParser {
return QueryOptionValidator<T>::InclusiveLowerBound(option, *result, lower);
}
+ template <typename T>
+ static Status ParseAndCheckExclusiveLowerBound(TImpalaQueryOptions::type option,
+ const std::string& value, const T lower, T* result) {
+ Status status = Parse(option, value, result);
+ RETURN_IF_ERROR(status);
+ return QueryOptionValidator<T>::ExclusiveLowerBound(option, *result, lower);
+ }
+
template <typename T>
static Status ParseAndCheckNonNegative(
TImpalaQueryOptions::type option, const std::string& value, T* result) {
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 66cea4991..75bd1867a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -428,6 +428,23 @@ TEST(QueryOptions, SetSpecialOptions) {
TestError("8191"); // default value of FLAGS_min_buffer_size is 8KB
TestOk("64KB", 64 * 1024);
}
+ // QUERY_CPU_COUNT_DIVISOR should be greater than 0.0.
+ {
+ OptionDef<double> key_def = MAKE_OPTIONDEF(query_cpu_count_divisor);
+ auto TestOk = MakeTestOkFn(options, key_def);
+ auto TestError = MakeTestErrFn(options, key_def);
+ TestOk("0.5", 0.5);
+ TestOk("0.0000000001", 0.0000000001);
+ TestOk("0.999999999", 0.999999999);
+ TestOk(" 0.9", 0.9);
+ TestOk("1", 1.0);
+ TestOk("1.1", 1.1);
+ TestOk("1000.00", 1000.0);
+ TestError("0");
+ TestError("-1");
+ TestError("-0.1");
+ TestError("Not a number!");
+ }
}
TEST(QueryOptions, ParseQueryOptions) {
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ec6dfce5e..0cf88b904 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1195,6 +1195,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_max_num_filters_aggregated_per_host(int32_t_val);
break;
}
+ case TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR: {
+ double double_val = 0.0f;
+ RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckExclusiveLowerBound<double>(
+ option, value, 0.0, &double_val));
+ query_options->__set_query_cpu_count_divisor(double_val);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index d31116d3f..defa0dd7f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \
- TImpalaQueryOptions::MAX_NUM_FILTERS_AGGREGATED_PER_HOST + 1); \
+ TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR + 1); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \
@@ -321,6 +321,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE, TQueryOptionLevel::DEVELOPMENT) \
QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \
TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(query_cpu_count_divisor, \
+ QUERY_CPU_COUNT_DIVISOR, TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 8c349aac3..64f2fe0be 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -210,6 +210,7 @@ DEFINE_string(ignored_dir_prefix_list, ".,_tmp.,_spark_metadata",
" skip in loading file metadata.");
DEFINE_double_hidden(query_cpu_count_divisor, 1.0,
+ "(Deprecated) this is now deprecated in favor of query option with the same name. "
"(Advance) Divide the CPU requirement of a query to fit the total available CPU in "
"the executor group. For example, setting value 2 will fit the query with CPU "
"requirement 2X to an executor group with total available CPU X. Note that setting "
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3276c2785..d308463dc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -907,6 +907,15 @@ enum TImpalaQueryOptions {
// ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting 1, 0, or negative value
// will disable the intermediate aggregator feature. Default to -1 (disabled).
MAX_NUM_FILTERS_AGGREGATED_PER_HOST = 172
+
+ // Divide the CPU requirement of a query to fit the total available CPU in
+ // the executor group. For example, setting value 2 will fit the query with CPU
+ // requirement 2X to an executor group with total available CPU X. Note that setting
+ // with a fractional value less than 1 effectively multiplies the query CPU
+ // requirement. A valid value is > 0.0.
+ // If this query option is not set, value of backend flag --query_cpu_count_divisor
+ // (default to 1.0) will be picked up instead.
+ QUERY_CPU_COUNT_DIVISOR = 173
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 794301257..16d348e6e 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -693,6 +693,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
173: optional i32 max_num_filters_aggregated_per_host = -1
+
+ // See comment in ImpalaService.thrift
+ 174: optional double query_cpu_count_divisor
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 7be0861a1..31f9a570d 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2080,9 +2080,13 @@ public class Frontend {
planCtx.compilationState_.captureState();
boolean isComputeCost = queryOptions.isCompute_processing_cost();
+ double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor();
if (isComputeCost) {
+ if (queryOptions.isSetQuery_cpu_count_divisor()) {
+ cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor();
+ }
FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE,
- Double.doubleToLongBits(BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
+ Double.doubleToLongBits(cpuCountDivisor));
}
TExecutorGroupSet group_set = null;
@@ -2199,9 +2203,8 @@ public class Frontend {
+ queryOptions.getMax_fragment_instances_per_node() + ").");
}
- scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE,
- Math.ceil(
- cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
+ scaled_cores_requirement = (int) Math.min(
+ Integer.MAX_VALUE, Math.ceil(cores_requirement / cpuCountDivisor));
cpuReqSatisfied = scaled_cores_requirement <= available_cores;
addCounter(
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 6718e4572..e7a9e382c 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -1221,11 +1221,31 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._run_query_and_verify_profile(CPU_TEST_QUERY,
["Executor Group: root.small-group",
"CpuAsk: 6", "EffectiveParallelism: 11",
- "ExecutorGroupsConsidered: 2"])
+ "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+
+ # Test that QUERY_CPU_COUNT_DIVISOR option can override
+ # query_cpu_count_divisor flag.
+ self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'})
+ self._run_query_and_verify_profile(CPU_TEST_QUERY,
+ ["Executor Group: root.small-group",
+ "CpuAsk: 11", "EffectiveParallelism: 11",
+ "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"])
+ self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'})
+ self._run_query_and_verify_profile(CPU_TEST_QUERY,
+ ["Executor Group: root.large-group",
+ "CpuAsk: 22", "EffectiveParallelism: 11",
+ "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"])
+ self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'})
+ self._run_query_and_verify_profile(CPU_TEST_QUERY,
+ ["Executor Group: root.small-group",
+ "CpuAsk: 6", "EffectiveParallelism: 11",
+ "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
# Check resource pools on the Web queries site and admission site
- self._verify_query_num_for_resource_pool("root.small", 1)
- self._verify_total_admitted_queries("root.small", 1)
+ self._verify_query_num_for_resource_pool("root.small", 3)
+ self._verify_query_num_for_resource_pool("root.large", 1)
+ self._verify_total_admitted_queries("root.small", 3)
+ self._verify_total_admitted_queries("root.large", 1)
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_fraction(self):