You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/23 08:07:21 UTC

[impala] branch master updated: IMPALA-10860: Allow setting mem_limit for coordinators

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 05bc48585 IMPALA-10860: Allow setting mem_limit for coordinators
05bc48585 is described below

commit 05bc485851a255b1b00bdb1a1086ba5527877768
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Fri Aug 18 10:18:46 2023 -0700

    IMPALA-10860: Allow setting mem_limit for coordinators
    
    Added support for MEM_LIMIT_COORDINATORS query option. This is
    similar to exisiting MEM_LIMIT_EXECUTORS, but applies to coordinators.
    There are cases where Planner generates inaccurate estimates for
    coordinator fragments and would be good to be able to set mem limit
    just for the coordinator, since a query's memory requirement on
    coordinator tends to be much lower compared to that on executors.
    
    If MEM_LIMIT is set, then MEM_LIMIT_COORDINATORS is ignored.
    
    Also updated the documentation for the new query option.
    
    Testing:
    - Added new custom cluster tests which validates MEM_LIMIT_COORDINATORS
    applies only on coordinator. The test also validates that both
    MEM_LIMIT_EXECUTORS and MEM_LIMIT_COORDINATORS can be set together.
    - Built docs and made sure that the new changes have proper formatting.
    
    Change-Id: I2dfc9a735e82dce2fd903bdaf6bc2e46e982ef8c
    Reviewed-on: http://gerrit.cloudera.org:8080/20378
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/schedule-state.cc               | 11 +++++-
 be/src/service/query-options-test.cc              |  1 +
 be/src/service/query-options.cc                   |  6 ++++
 be/src/service/query-options.h                    |  6 ++--
 common/thrift/ImpalaService.thrift                |  8 +++++
 common/thrift/Query.thrift                        |  3 ++
 docs/topics/impala_mem_limit.xml                  | 21 +++++++++++-
 tests/custom_cluster/test_admission_controller.py | 41 +++++++++++++++++++++++
 8 files changed, 93 insertions(+), 4 deletions(-)

diff --git a/be/src/scheduling/schedule-state.cc b/be/src/scheduling/schedule-state.cc
index 59e8c78c8..c4b43ceb5 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -315,6 +315,14 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg,
     }
   }
 
+  // Enforce the MEM_LIMIT_COORDINATORS query option if MEM_LIMIT is not specified.
+  const bool is_mem_limit_coordinators_set =
+      query_options().__isset.mem_limit_coordinators
+      && query_options().mem_limit_coordinators > 0;
+  if (!is_mem_limit_set && is_mem_limit_coordinators_set) {
+    coord_backend_mem_to_admit = query_options().mem_limit_coordinators;
+  }
+
   // Enforce the MEM_LIMIT_EXECUTORS query option if MEM_LIMIT is not specified.
   const bool is_mem_limit_executors_set = query_options().__isset.mem_limit_executors
       && query_options().mem_limit_executors > 0;
@@ -336,7 +344,8 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg,
   }
 
   int64_t per_backend_mem_limit;
-  if (mimic_old_behaviour && !is_mem_limit_set && !is_mem_limit_executors_set) {
+  if (mimic_old_behaviour && !is_mem_limit_set && !is_mem_limit_executors_set
+      && !is_mem_limit_coordinators_set) {
     per_backend_mem_limit = -1;
     query_schedule_pb_->set_coord_backend_mem_limit(-1);
   } else {
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index a1c8182bf..e1065789e 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -160,6 +160,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(max_result_spooling_mem), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_spilled_result_spooling_mem), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(large_agg_mem_threshold), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(mem_limit_coordinators), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d95c10418..2db91041d 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1140,6 +1140,12 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_agg_mem_correlation_factor(double_val);
         break;
       }
+      case TImpalaQueryOptions::MEM_LIMIT_COORDINATORS: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_mem_limit_coordinators(mem_spec_val.value);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index bb859b4fe..74de1b972 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::AGG_MEM_CORRELATION_FACTOR + 1);                              \
+      TImpalaQueryOptions::MEM_LIMIT_COORDINATORS + 1);                              \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -193,7 +193,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)                   \
   QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,         \
       TQueryOptionLevel::ADVANCED)                                                       \
-  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT) \
+  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::ADVANCED)    \
   QUERY_OPT_FN(                                                                          \
       broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)         \
   QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)      \
@@ -304,6 +304,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       large_agg_mem_threshold, LARGE_AGG_MEM_THRESHOLD, TQueryOptionLevel::ADVANCED)     \
   QUERY_OPT_FN(agg_mem_correlation_factor, AGG_MEM_CORRELATION_FACTOR,                   \
       TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(mem_limit_coordinators, MEM_LIMIT_COORDINATORS,                           \
+      TQueryOptionLevel::ADVANCED)                                                       \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0ab6be4cc..0ee327216 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -833,6 +833,14 @@ enum TImpalaQueryOptions {
   // Valid values are in [0.0, 1.0]. Setting value 1.0 will result in an equal memory
   // estimate as the default estimation (no change). Default to 0.5.
   AGG_MEM_CORRELATION_FACTOR = 163
+
+  // A per coordinator approximate limit on the memory consumption
+  // of this query. Only applied if MEM_LIMIT is not specified.
+  // Unspecified or a limit of 0 or negative value means no limit;
+  // Otherwise specified either as:
+  // a) an int (= number of bytes);
+  // b) a float followed by "M" (MB) or "G" (GB)
+  MEM_LIMIT_COORDINATORS = 164
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index e67aba9c8..033e756ed 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -657,6 +657,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   164: optional double agg_mem_correlation_factor = 0.5
+
+  // See comment in ImpalaService.thrift
+  165: optional i64 mem_limit_coordinators = 0;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/docs/topics/impala_mem_limit.xml b/docs/topics/impala_mem_limit.xml
index 02a1d7750..a9a5ca1ba 100644
--- a/docs/topics/impala_mem_limit.xml
+++ b/docs/topics/impala_mem_limit.xml
@@ -213,7 +213,7 @@ information about the per-node memory requirements.</codeblock>
 <concept id="mem_limit_executors">
   <title>MEM_LIMIT_EXECUTORS Query Option</title>
     <conbody>
-      <note>This is a developer-only query option. Setting this query option is not recommended
+      <note>This is an advanced query option. Setting this query option is not recommended
         unless specifically advised.</note>
       <p>The existing <codeph>MEM_LIMIT</codeph> query option applies to all impala coordinators and
         executors. This means that the same amount of memory gets reserved but coordinators
@@ -229,4 +229,23 @@ information about the per-node memory requirements.</codeblock>
           <codeph>MEM_LIMIT</codeph>. If you set both, only <codeph>MEM_LIMIT</codeph> applies.</p>
     </conbody>
 </concept>
+<concept id="mem_limit_coordinators">
+  <title>MEM_LIMIT_COORDINATORS Query Option</title>
+    <conbody>
+      <note>This is an advanced query option. Setting this query option is not recommended
+        unless specifically advised.</note>
+      <p>The existing <codeph>MEM_LIMIT</codeph> query option applies to all impala coordinators and
+        executors. This means that the same amount of memory gets reserved but coordinators
+        typically just do the job of coordinating the query and thus do not necessarily need all the
+        estimated memory. Blocking the estimated memory on coordinators blocks the memory to be used
+        for other queries.</p>
+      <p>The new <codeph>MEM_LIMIT_COORDINATORS</codeph> query option functions similarly to the
+          <codeph>MEM_LIMIT</codeph> option but sets the query memory limit only on coordinators. This
+        new option addresses the issue related to <codeph>MEM_LIMIT</codeph> and is recommended in
+        scenarios where the query needs higher or lower memory on coordinators compared to the planner
+        estimates.</p>
+      <p>Note that the <codeph>MEM_LIMIT_COORDINATORS</codeph> option does not work with
+          <codeph>MEM_LIMIT</codeph>. If you set both, only <codeph>MEM_LIMIT</codeph> applies.</p>
+    </conbody>
+</concept>
 </concept>
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 87d0508bd..43199fc00 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -717,6 +717,47 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert expected_mem_limits['executor'] == float(
       expected_exec_mem_limit), expected_mem_limits
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2,
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+          pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024))
+  def test_mem_limit_coordinators(self, vector, unique_database):
+    """Verify that the query option mem_limit_coordinators is only enforced on the
+    coordinators."""
+    expected_exec_mem_limit = "999999999"
+    expected_coord_mem_limit = "111111111"
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    self.client.set_configuration({"MEM_LIMIT_EXECUTORS": expected_exec_mem_limit,
+        "MEM_LIMIT_COORDINATORS": expected_coord_mem_limit})
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+    assert expected_mem_limits['executor'] == float(
+      expected_exec_mem_limit), expected_mem_limits
+    assert expected_mem_limits['coordinator'] == float(
+      expected_coord_mem_limit), expected_mem_limits
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2,
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+          pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024))
+  def test_mem_limits(self, vector, unique_database):
+    """Verify that the query option mem_limit_coordinators and mem_limit_executors are
+    ignored when mem_limit is set."""
+    exec_mem_limit = "999999999"
+    coord_mem_limit = "111111111"
+    mem_limit = "888888888"
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    self.client.set_configuration({"MEM_LIMIT_EXECUTORS": exec_mem_limit,
+        "MEM_LIMIT_COORDINATORS": coord_mem_limit, "MEM_LIMIT": mem_limit})
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+    assert expected_mem_limits['executor'] == float(mem_limit), expected_mem_limits
+    assert expected_mem_limits['coordinator'] == float(mem_limit), expected_mem_limits
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,