You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/04/18 15:11:43 UTC

[2/3] impala git commit: IMPALA-6847: work around high memory estimates for AC

IMPALA-6847: work around high memory estimates for AC

Adds MAX_MEM_ESTIMATE_FOR_ADMISSION query option, which takes
effect if and only if
* Memory-based admission control is enabled for the pool
* No mem_limit is set (i.e. best practices are not being followed)

In that case min(MAX_MEM_ESTIMATE_FOR_ADMISSION, mem_estimate)
is used for admission control instead of mem_estimate.

This provides a way to override the planner's estimate if
it happens to be incorrect and are preventing the query from
running. Setting MEM_LIMIT is usually a better alternative
but sometimes it is not feasible to set MEM_LIMIT for each
individual query.

Testing:
Added an admission control test to verify that query option allows
queries with high estimates to run.

Also tested manually on a minicluster started with:

  start-impala-cluster.py --impalad_args='-vmodule admission-controller=3 \
      -default_pool_mem_limit 12884901888'

Change-Id: Ia5fc32a507ad0f00f564dfe4f954a829ac55d14e
Reviewed-on: http://gerrit.cloudera.org:8080/10058
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3ebf30a2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3ebf30a2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3ebf30a2

Branch: refs/heads/master
Commit: 3ebf30a2a4de675388e8f2237cc0ec0c99458cf5
Parents: 834b3b9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Apr 12 22:07:39 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Apr 18 01:18:20 2018 +0000

----------------------------------------------------------------------
 be/src/service/query-options-test.cc            |  1 +
 be/src/service/query-options.cc                 |  7 ++
 be/src/service/query-options.h                  |  6 +-
 common/thrift/ImpalaInternalService.thrift      |  3 +
 common/thrift/ImpalaService.thrift              |  8 ++
 .../org/apache/impala/service/Frontend.java     | 20 +++++
 .../admission-reject-mem-estimate.test          | 77 ++++++++++++++++++++
 .../custom_cluster/test_admission_controller.py | 24 +++---
 8 files changed, 135 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 3352bd1..a472f2a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -142,6 +142,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(max_row_size),          {1, ROW_SIZE_LIMIT}},
       {MAKE_OPTIONDEF(parquet_file_size),     {-1, I32_MAX}},
       {MAKE_OPTIONDEF(compute_stats_min_sample_size), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32 {
       {MAKE_OPTIONDEF(runtime_filter_min_size),

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index b219a00..7395e24 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -637,6 +637,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
                 iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION: {
+        int64_t bytes_limit;
+        RETURN_IF_ERROR(ParseMemValue(
+            value, "max memory estimate for admission", &bytes_limit));
+        query_options->__set_max_mem_estimate_for_admission(bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 82e04a1..27d7c6a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::SHUFFLE_DISTINCT_EXPRS + 1);\
+      TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -131,6 +131,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
@@ -151,7 +153,7 @@ std::string DebugQueryOptions(const TQueryOptions& query_options);
 
 /// Bitmask for the values of TQueryOptions.
 /// TODO: Find a way to set the size based on the number of fields.
-typedef std::bitset<65> QueryOptionsMask;
+typedef std::bitset<128> QueryOptionsMask;
 
 /// Updates the query options in dst from those in src where the query option is set
 /// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 8cbc573..b394443 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -279,6 +279,9 @@ struct TQueryOptions {
   // exprs in the second phase which is not required when omitting the distinct exprs in
   // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
   64: optional bool shuffle_distinct_exprs = true;
+
+  // See comment in ImpalaService.thrift.
+  65: optional i64 max_mem_estimate_for_admission = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index e25bd60..02d674c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -290,6 +290,14 @@ enum TImpalaQueryOptions {
   // exprs in the second phase which is not required when omitting the distinct exprs in
   // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
   SHUFFLE_DISTINCT_EXPRS,
+
+  // This only has an effect if memory-estimate-based admission control is enabled, i.e.
+  // max_mem_resources is set for the pool and, *contrary to best practices*, MEM_LIMIT
+  // is not set. In that case, then min(MAX_MEM_ESTIMATE_FOR_ADMISSION,
+  // planner memory estimate) is used for admission control purposes. This provides a
+  // workaround if the planner's memory estimate is too high and prevents a runnable
+  // query from being admitted. 0 or -1 means this has no effect. Defaults to 0.
+  MAX_MEM_ESTIMATE_FOR_ADMISSION,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 348adaf..ae3975c 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1060,6 +1060,11 @@ public class Frontend {
       queryExecRequest.setLineage_graph(thriftLineageGraph);
     }
 
+    // Override the per_host_mem_estimate sent to the backend if needed. The explain
+    // string is already generated at this point so this does not change the estimate
+    // shown in the plan.
+    checkAndOverrideMemEstimate(queryExecRequest, queryOptions);
+
     if (analysisResult.isExplainStmt()) {
       // Return the EXPLAIN request
       createExplainRequest(explainString.toString(), result);
@@ -1117,6 +1122,21 @@ public class Frontend {
   }
 
   /**
+   * The MAX_MEM_ESTIMATE_FOR_ADMISSION query option can override the planner memory
+   * estimate if set. Sets queryOptions.per_host_mem_estimate if the override is
+   * effective.
+   */
+  private void checkAndOverrideMemEstimate(TQueryExecRequest queryExecRequest,
+      TQueryOptions queryOptions) {
+    if (queryOptions.isSetMax_mem_estimate_for_admission()
+        && queryOptions.getMax_mem_estimate_for_admission() > 0) {
+      long effectiveMemEstimate = Math.min(queryExecRequest.getPer_host_mem_estimate(),
+              queryOptions.getMax_mem_estimate_for_admission());
+      queryExecRequest.setPer_host_mem_estimate(effectiveMemEstimate);
+    }
+  }
+
+  /**
    * Attaches the explain result to the TExecRequest.
    */
   private void createExplainRequest(String explainString, TExecRequest result) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
new file mode 100644
index 0000000..8868c9f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
@@ -0,0 +1,77 @@
+====
+---- QUERY
+# Small query where estimate exceeds available memory but the query can execute with a
+# low mem_limit.
+select min(n_name) from tpch_parquet.nation
+---- CATCH
+Rejected query from pool default-pool: request memory needed 26.00 MB is greater than pool max mem resources 10.00 MB
+====
+---- QUERY
+# Overriding the memory estimate should allow the query to execute.
+set max_mem_estimate_for_admission=10m;
+select min(n_name) from tpch_parquet.nation
+---- RESULTS
+'ALGERIA'
+---- RUNTIME_PROFILE
+row_regex: .*Query Options \(set by configuration\).*MAX_MEM_ESTIMATE_FOR_ADMISSION=10485760.*
+# Memory estimate sent to backend is overridden, but the explain plan shows the planner output.
+row_regex: .*Estimated Per-Host Mem: 10485760.*
+row_regex: .*Per-Host Resource Estimates: Memory=26.00MB.*
+====
+---- QUERY
+# If the estimate is set to a higher value that is still > the max mem resources for
+# the pool, the query still can't execute.
+set max_mem_estimate_for_admission=20m;
+select min(n_name) from tpch_parquet.nation
+---- CATCH
+Rejected query from pool default-pool: request memory needed 20.00 MB is greater than pool max mem resources 10.00 MB
+====
+---- QUERY
+# MEM_LIMIT takes precedence over MAX_MEM_ESTIMATE_FOR_ADMISSION, so the query is
+# rejected if MEM_LIMIT is higher.
+set mem_limit=20m;
+set max_mem_estimate_for_admission=10m;
+select min(n_name) from tpch_parquet.nation
+---- CATCH
+Rejected query from pool default-pool: request memory needed 20.00 MB is greater than pool max mem resources 10.00 MB
+====
+---- QUERY
+# MEM_LIMIT takes precedence over MAX_MEM_ESTIMATE_FOR_ADMISSION, so the query is
+# admitted if MEM_LIMIT is lower.
+set mem_limit=10m;
+set max_mem_estimate_for_admission=20m;
+select min(n_name) from tpch_parquet.nation
+---- RESULTS
+'ALGERIA'
+---- RUNTIME_PROFILE
+row_regex: .*Query Options \(set by configuration\).*MEM_LIMIT=10485760.*MAX_MEM_ESTIMATE_FOR_ADMISSION=20971520.*
+# Memory estimate sent to backend is overridden, but the explain plan shows the planner output.
+row_regex: .*Estimated Per-Host Mem: 20971520.*
+row_regex: .*Per-Host Resource Estimates: Memory=26.00MB.*
+====
+---- QUERY
+# Larger queries that use more memory than the estimate can still run because no mem_limit is set.
+set max_mem_estimate_for_admission=10m;
+select min(l_comment) from tpch_parquet.lineitem
+---- RESULTS
+' Tiresias '
+---- RUNTIME_PROFILE
+row_regex: .*Query Options \(set by configuration\).*MAX_MEM_ESTIMATE_FOR_ADMISSION=10485760.*
+# Memory estimate sent to backend is overridden, but the explain plan shows the planner output.
+row_regex: .*Estimated Per-Host Mem: 10485760.*
+row_regex: .*Per-Host Resource Estimates: Memory=90.00MB.*
+====
+---- QUERY
+# If the memory estimate is less than max_mem_estimate_for_admission, then the estimate
+# is used for admission. This query's memory estimate is 10MB, and it would be rejected
+# if the memory estimate was > 10MB.
+set max_mem_estimate_for_admission=20m;
+select 'foo'
+---- RESULTS
+'foo'
+---- RUNTIME_PROFILE
+row_regex: .*Query Options \(set by configuration\).*MAX_MEM_ESTIMATE_FOR_ADMISSION=20971520.*
+# Memory estimate sent to backend is overridden, but the explain plan shows the planner output.
+row_regex: .*Estimated Per-Host Mem: 10485760.*
+row_regex: .*Per-Host Resource Estimates: Memory=10.00MB.*
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/3ebf30a2/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index ec32cce..ab09069 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -384,17 +384,23 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
       statestored_args=_STATESTORED_ARGS)
-  def test_reject_min_reservation(self, vector):
-    """Test that the query will be rejected by admission control if:
-       a) the largest per-backend min buffer reservation is larger than the query mem
-          limit
-       b) the largest per-backend min buffer reservation is larger than the
-          buffer_pool_limit query option
-       c) the cluster-wide min-buffer reservation size is larger than the pool memory
-          resources.
-    """
+  def test_memory_rejection(self, vector):
+    """Test that rejection of queries based on reservation and estimates works as
+    expected."""
+    # Test that the query will be rejected by admission control if:
+    # a) the largest per-backend min buffer reservation is larger than the query mem limit
+    # b) the largest per-backend min buffer reservation is larger than the
+    #    buffer_pool_limit query option
+    # c) the cluster-wide min-buffer reservation size is larger than the pool memory
+    #    resources.
     self.run_test_case('QueryTest/admission-reject-min-reservation', vector)
 
+    # Test that queries are rejected based on memory estimates. Set num_nodes=1 to
+    # avoid unpredictability from scheduling on different backends.
+    exec_options = vector.get_value('exec_option')
+    exec_options['num_nodes'] = 1
+    self.run_test_case('QueryTest/admission-reject-mem-estimate', vector)
+
   # Process mem_limit used in test_mem_limit_upper_bound
   PROC_MEM_TEST_LIMIT = 1024 * 1024 * 1024