You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/09/27 04:24:03 UTC

[impala] branch master updated (93fb918 -> 104a454)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 93fb918  IMPALA-8647: fix round-to-zero in planner estimates
     new 548106f  IMPALA-8451,IMPALA-8905: enable admission control for dockerised tests
     new 104a454  IMPALA-8928: Add MEM_LIMIT_EXECUTORS query option

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/scheduling/admission-controller.cc          | 20 ++++++++---
 be/src/scheduling/admission-controller.h           |  4 +++
 be/src/scheduling/query-schedule.cc                | 19 ++++++----
 be/src/service/impala-http-handler.h               |  1 +
 be/src/service/query-options-test.cc               |  1 +
 be/src/service/query-options.cc                    |  8 +++++
 be/src/service/query-options.h                     |  5 +--
 bin/jenkins/dockerized-impala-run-tests.sh         |  4 +++
 common/thrift/ImpalaInternalService.thrift         |  3 ++
 common/thrift/ImpalaService.thrift                 |  8 +++++
 common/thrift/metrics.json                         | 10 ++++++
 .../test/resources/minicluster-fair-scheduler.xml  | 29 ++++++++++++++++
 fe/src/test/resources/minicluster-llama-site.xml   | 40 ++++++++++++++++++++++
 .../QueryTest/admission-reject-mem-estimate.test   |  6 ++--
 .../queries/QueryTest/compute-stats.test           |  2 +-
 .../queries/QueryTest/large_strings.test           | 16 +++++++++
 .../queries/QueryTest/spilling-large-rows.test     |  5 +++
 .../queries/QueryTest/thread-limits.test           | 10 +++---
 tests/custom_cluster/test_admission_controller.py  | 19 +++++++++-
 tests/query_test/test_insert.py                    |  1 +
 tests/query_test/test_observability.py             |  5 ++-
 tests/query_test/test_query_mem_limit.py           |  2 ++
 tests/webserver/test_web_pages.py                  | 33 +++++++++++-------
 www/admission_controller.tmpl                      |  5 +++
 24 files changed, 220 insertions(+), 36 deletions(-)
 create mode 100644 fe/src/test/resources/minicluster-fair-scheduler.xml
 create mode 100644 fe/src/test/resources/minicluster-llama-site.xml


[impala] 01/02: IMPALA-8451, IMPALA-8905: enable admission control for dockerised tests

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 548106f5e19c6ece17ef25b5ce2ece6881f00098
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sat Jul 27 19:06:52 2019 -0700

    IMPALA-8451,IMPALA-8905: enable admission control for dockerised tests
    
    This gives us some additional coverage for using admission
    control in a simple but realistic configuration.
    
    What are the implications of this change for test stability and
    flakiness?
    
    On one hand were are adding some more unpredictability
    to tests, because they may be queued for an arbitrary amount of
    time. On the other, we can prevent queries from contending over
    memory. Currently we rely on luck to prevent concurrent queries
    from forcing each other out-of-memory.
    
    I think the unpredictability from the queueing is
    preferable, because we can generally work around these by
    fixing tests that are sensitive to being queued, whereas
    contention over memory requires us to use crude workarounds
    like forcing tests to execute serially.
    
    Added observability for the configured queue wait time for each pool.
    I noticed that I did not have a direct way to observe the effective
    value when I set configs. This is IMPALA-8905.
    
    I had to tweak tests in a few ways:
    * Tests with large strings needed higher memory limits.
    * Hardcoded instances of default-pool had to handle root.default
      as well.
    * test_query_mem_limit needed to run without a mem_limit. I
      created a special pool root.no-limits with no memory limits
      to allow that.
    
    Testing:
    Ran the dockerised build 5-6 times to flush out flaky tests.
    
    Change-Id: I7517673f9e348780fcf7cd6ce1f12c9c5a55373a
    Reviewed-on: http://gerrit.cloudera.org:8080/13942
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc          | 20 ++++++++---
 be/src/scheduling/admission-controller.h           |  4 +++
 be/src/service/impala-http-handler.h               |  1 +
 bin/jenkins/dockerized-impala-run-tests.sh         |  4 +++
 common/thrift/metrics.json                         | 10 ++++++
 .../test/resources/minicluster-fair-scheduler.xml  | 29 ++++++++++++++++
 fe/src/test/resources/minicluster-llama-site.xml   | 40 ++++++++++++++++++++++
 .../QueryTest/admission-reject-mem-estimate.test   |  6 ++--
 .../queries/QueryTest/compute-stats.test           |  2 +-
 .../queries/QueryTest/large_strings.test           | 16 +++++++++
 .../queries/QueryTest/spilling-large-rows.test     |  5 +++
 .../queries/QueryTest/thread-limits.test           | 10 +++---
 tests/query_test/test_insert.py                    |  1 +
 tests/query_test/test_observability.py             |  5 ++-
 tests/query_test/test_query_mem_limit.py           |  2 ++
 tests/webserver/test_web_pages.py                  | 33 +++++++++++-------
 www/admission_controller.tmpl                      |  5 +++
 17 files changed, 166 insertions(+), 27 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 6e719b2..ad78b5f 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -120,6 +120,8 @@ const string POOL_MAX_REQUESTS_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-requests.$0";
 const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-queued.$0";
+const string POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT =
+  "admission-controller.pool-queue-timeout.$0";
 const string POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-query-mem-limit.$0";
 const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
@@ -777,6 +779,7 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(
   metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
   metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
   metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
+  metrics_.pool_queue_timeout->SetValue(GetQueueTimeoutForPoolMs(pool_cfg));
   metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
   metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
   metrics_.clamp_mem_limit_query_option->SetValue(pool_cfg.clamp_mem_limit_query_option);
@@ -891,11 +894,7 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
       PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node.not_admitted_reason);
   request.query_events->MarkEvent(QUERY_EVENT_QUEUED);
 
-  int64_t queue_wait_timeout_ms = FLAGS_queue_wait_timeout_ms;
-  if (pool_cfg.__isset.queue_timeout_ms) {
-    queue_wait_timeout_ms = pool_cfg.queue_timeout_ms;
-  }
-  queue_wait_timeout_ms = max<int64_t>(0, queue_wait_timeout_ms);
+  int64_t queue_wait_timeout_ms = GetQueueTimeoutForPoolMs(pool_cfg);
   int64_t wait_start_ms = MonotonicMillis();
 
   // Block in Get() up to the time out, waiting for the promise to be set when the query
@@ -1446,6 +1445,13 @@ void AdmissionController::DequeueLoop() {
   }
 }
 
+int64_t AdmissionController::GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config) {
+  int64_t queue_wait_timeout_ms = pool_config.__isset.queue_timeout_ms ?
+      pool_config.queue_timeout_ms :
+      FLAGS_queue_wait_timeout_ms;
+  return max<int64_t>(0, queue_wait_timeout_ms);
+}
+
 int64_t AdmissionController::GetMaxToDequeue(RequestQueue& queue, PoolStats* stats,
     const TPoolConfig& pool_config, int64_t cluster_size) {
   if (PoolLimitsRunningQueriesCount(pool_config)) {
@@ -1672,6 +1678,8 @@ void AdmissionController::PoolStats::ToJson(
       document->GetAllocator());
   pool->AddMember(
       "pool_max_queued", metrics_.pool_max_queued->GetValue(), document->GetAllocator());
+  pool->AddMember("pool_queue_timeout", metrics_.pool_queue_timeout->GetValue(),
+      document->GetAllocator());
   pool->AddMember("max_query_mem_limit", metrics_.max_query_mem_limit->GetValue(),
       document->GetAllocator());
   pool->AddMember("min_query_mem_limit", metrics_.min_query_mem_limit->GetValue(),
@@ -1766,6 +1774,8 @@ void AdmissionController::PoolStats::InitMetrics() {
       POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
   metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.pool_queue_timeout = parent_->metrics_group_->AddGauge(
+      POOL_QUEUE_TIMEOUT_METRIC_KEY_FORMAT, 0, name_);
   metrics_.max_query_mem_limit = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
   metrics_.min_query_mem_limit = parent_->metrics_group_->AddGauge(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 24c810f..95c80e5 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -478,6 +478,7 @@ class AdmissionController {
       IntGauge* pool_max_mem_resources;
       IntGauge* pool_max_requests;
       IntGauge* pool_max_queued;
+      IntGauge* pool_queue_timeout;
       IntGauge* max_query_mem_limit;
       IntGauge* min_query_mem_limit;
       BooleanProperty* clamp_mem_limit_query_option;
@@ -940,6 +941,9 @@ class AdmissionController {
   static std::string GetMaxRequestsForPoolDescription(
       const TPoolConfig& pool_config, int64_t cluster_size);
 
+  /// Returns the effective queue timeout for the pool in milliseconds.
+  static int64_t GetQueueTimeoutForPoolMs(const TPoolConfig& pool_config);
+
   /// Returns a maximum number of queries that should be dequeued locally from 'queue'
   /// before DequeueLoop waits on dequeue_cv_ at the top of its loop.
   /// If it can be determined that no queries can currently be run, then zero
diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h
index 380fb7d..b821012 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -203,6 +203,7 @@ class ImpalaHttpHandler {
   ///    "pool_max_mem_resources": 10485760,
   ///    "pool_max_requests": 10,
   ///    "pool_max_queued": 10,
+  ///    "pool_queue_timeout": 60000,
   ///    "max_query_mem_limit": 0,
   ///    "min_query_mem_limit": 0,
   ///    "clamp_mem_limit_query_option": true,
diff --git a/bin/jenkins/dockerized-impala-run-tests.sh b/bin/jenkins/dockerized-impala-run-tests.sh
index f2bc8ec..e097fcc 100755
--- a/bin/jenkins/dockerized-impala-run-tests.sh
+++ b/bin/jenkins/dockerized-impala-run-tests.sh
@@ -76,8 +76,12 @@ make -j ${IMPALA_BUILD_THREADS} docker_debug_images parquet-reader
 
 source_impala_config
 
+FAIR_SCHED_CONF=/opt/impala/conf/minicluster-fair-scheduler.xml
+LLAMA_CONF=/opt/impala/conf/minicluster-llama-site.xml
 export TEST_START_CLUSTER_ARGS="--docker_network=${DOCKER_NETWORK}"
 TEST_START_CLUSTER_ARGS+=" --data_cache_dir=/tmp --data_cache_size=500m"
+TEST_START_CLUSTER_ARGS+=" --impalad_args=-fair_scheduler_allocation_path=${FAIR_SCHED_CONF}"
+TEST_START_CLUSTER_ARGS+=" --impalad_args=-llama_site_path=${LLAMA_CONF}"
 export MAX_PYTEST_FAILURES=0
 export NUM_CONCURRENT_TESTS=$(nproc)
 # Frontend tests fail because of localhost hardcoded everywhere
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 81a9aeb..be317c9 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -50,6 +50,16 @@
     "key": "admission-controller.pool-max-queued.$0"
   },
   {
+    "description": "Resource Pool $0 Queue Timeout",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Queue Timeout",
+    "units": "TIME_MS",
+    "kind": "GAUGE",
+    "key": "admission-controller.pool-queue-timeout.$0"
+  },
+  {
     "description": "Resource Pool $0 Max Query Memory Limit",
     "contexts": [
       "RESOURCE_POOL"
diff --git a/fe/src/test/resources/minicluster-fair-scheduler.xml b/fe/src/test/resources/minicluster-fair-scheduler.xml
new file mode 100644
index 0000000..fc3b949
--- /dev/null
+++ b/fe/src/test/resources/minicluster-fair-scheduler.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<allocations>
+  <queue name="root">
+    <weight>1.0</weight>
+    <schedulingPolicy>fair</schedulingPolicy>
+    <aclSubmitApps>*</aclSubmitApps>
+    <aclAdministerApps>*</aclAdministerApps>
+    <queue name="default">
+      <weight>1.0</weight>
+      <schedulingPolicy>fair</schedulingPolicy>
+      <!-- Set memory absurdly high, to force memory based AC to be enabled
+           but so that only the per-host memory limits are actually taken
+           into account for admission control. -->
+      <maxResources>30000 mb, 0 vcores</maxResources>
+    </queue>
+    <!-- A pool with no per-query limits at all. Used for tests where we want to run
+       without memory limits, etc. Only one query can be executed at a time in this
+       pool, to make things more predictable.-->
+    <queue name="no-limits">
+      <weight>1.0</weight>
+      <schedulingPolicy>fair</schedulingPolicy>
+    </queue>
+  </queue>
+  <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
+  <queuePlacementPolicy>
+    <rule name="specified" create="false"/>
+    <rule name="default"/>
+  </queuePlacementPolicy>
+</allocations>
diff --git a/fe/src/test/resources/minicluster-llama-site.xml b/fe/src/test/resources/minicluster-llama-site.xml
new file mode 100644
index 0000000..a835e7c
--- /dev/null
+++ b/fe/src/test/resources/minicluster-llama-site.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <property>
+    <!-- Max memory limit given to queries by default. This will allow
+         running one large query and multiple small queries on a typical
+         minicluster where each impalad has ~7gb of memory. -->
+    <name>impala.admission-control.max-query-mem-limit.root.default</name>
+    <value>4294967296</value><!--4GB-->
+  </property>
+  <property>
+    <!-- Min memory limit given to queries by default. Set low enough
+         so as not to reduce query concurrency but high enough that
+         queries don't get starved for memory because of low estimates. -->
+    <name>impala.admission-control.min-query-mem-limit.root.default</name>
+    <value>268435456</value><!--256MB-->
+  </property>
+  <property>
+    <!--Allow individual tests to set mem_limit to any value, if needed.-->
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.default</name>
+    <value>false</value>
+  </property>
+  <!-- We need to increase the pool queue timeout to avoid flakiness from queries
+       getting stuck behind queries from other tests and timed out. Set to a
+       very high value to avoid failures unless queries are genuinely stuck. -->
+  <property>
+    <name>impala.admission-control.pool-queue-timeout-ms.root.default</name>
+    <value>1800000</value> <!-- 30 minutes -->
+  </property>
+  <property>
+    <name>llama.am.throttling.maximum.placed.reservations.root.no-limits</name>
+    <value>1</value>
+  </property>
+  <!-- We need to increase the pool queue timeout to avoid flakiness from queries
+       getting stuck behind queries from other tests and timed out. Set to a
+       very high value to avoid failures unless queries are genuinely stuck. -->
+  <property>
+    <name>impala.admission-control.pool-queue-timeout-ms.root.no-limits</name>
+    <value>1800000</value> <!-- 30 minutes -->
+  </property>
+</configuration>
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
index fb43c13..9984f2b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-mem-estimate.test
@@ -6,7 +6,7 @@ select min(c_name), min(n_name)
 from tpch_parquet.customer
   join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- CATCH
-Rejected query from pool default-pool: request memory needed 82.94 MB is greater than pool
+request memory needed 82.94 MB is greater than pool
  max mem resources 40.00 MB (configured statically)
 ====
 ---- QUERY
@@ -33,7 +33,7 @@ select min(c_name), min(n_name)
 from tpch_parquet.customer
   join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- CATCH
-Rejected query from pool default-pool: request memory needed 45.00 MB is greater than pool
+request memory needed 45.00 MB is greater than pool
  max mem resources 40.00 MB (configured statically)
 ====
 ---- QUERY
@@ -45,7 +45,7 @@ select min(c_name), min(n_name)
 from tpch_parquet.customer
   join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- CATCH
-Rejected query from pool default-pool: request memory needed 45.00 MB is greater than pool
+request memory needed 45.00 MB is greater than pool
  max mem resources 40.00 MB (configured statically)
 ====
 ---- QUERY
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test
index 38d1024..ccee48f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test
@@ -1826,5 +1826,5 @@ STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
 set mem_limit=1m;
 compute stats tpch_parquet.customer;
 ---- CATCH
-Rejected query from pool default-pool: minimum memory reservation is greater than memory available to the query for buffer reservations
+row_regex: .*Rejected query from pool .*: minimum memory reservation is greater than memory available to the query for buffer reservations.*
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
index 9df5a50..848c8a1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
@@ -1,5 +1,6 @@
 ====
 ---- QUERY
+set mem_limit=4gb;
 # IMPALA-1619 group_concat() error
 select length(group_concat(l_comment, "!")) from (
 select l_comment from tpch_parquet.lineitem union all
@@ -114,6 +115,7 @@ Memory limit exceeded
 ====
 ---- QUERY
 #IMPALA-3350: Results of string functions can exceed 1GB.
+set mem_limit=4gb;
 select length(concat_ws(',', s, s, s, s)) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -123,6 +125,7 @@ select length(concat_ws(',', s, s, s, s)) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(repeat(s, 10)) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -132,6 +135,7 @@ select length(repeat(s, 10)) from (
 repeat() result is larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(lpad(s, 1073741830, '!')) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -141,6 +145,7 @@ select length(lpad(s, 1073741830, '!')) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(rpad(s, 1073741830, '~')) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -150,11 +155,14 @@ select length(rpad(s, 1073741830, '~')) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
+set mem_limit=4gb;
 select space(1073741830);
 ---- CATCH
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(regexp_replace(s, '.', '++++++++')) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -164,6 +172,7 @@ select length(regexp_replace(s, '.', '++++++++')) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(replace(s, ' ', '++++++++')) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -173,6 +182,7 @@ select length(replace(s, ' ', '++++++++')) from (
 625718301
 =====
 ---- QUERY
+set mem_limit=5gb;
 select replace(x, '+', '000') from (select (replace(s, ' ', '++++++++')) x from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -182,16 +192,19 @@ select replace(x, '+', '000') from (select (replace(s, ' ', '++++++++')) x from
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select trunc(timestamp_col, space(1073741830)) from functional.alltypes
 ---- CATCH
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select extract(timestamp_col, space(1073741830)) from functional.alltypes
 ---- CATCH
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(madlib_encode_vector(concat_ws(',', s, s, s, s))) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -201,6 +214,7 @@ select length(madlib_encode_vector(concat_ws(',', s, s, s, s))) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 select length(madlib_decode_vector(concat_ws(',', s, s, s, s))) from (
   select group_concat(l_comment, "!") s from (
     select l_comment from tpch.lineitem union all
@@ -210,6 +224,7 @@ select length(madlib_decode_vector(concat_ws(',', s, s, s, s))) from (
 String length larger than allowed limit of 1 GB character data
 =====
 ---- QUERY
+set mem_limit=4gb;
 # IMPALA-4874: Generate a large row made up of multiple large strings to test RPC
 #              transmission. This uses hashing to make this difficult to compress,
 #              which results in a larger row batch.
@@ -226,6 +241,7 @@ INT,INT
 489174530,489174530
 =====
 ---- QUERY
+set mem_limit=4gb;
 select repeat('the quick brown fox', 1024 * 1024 * 100)
 ---- CATCH
 repeat() result is larger than allowed limit of 1 GB character data
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
index f1db129..8756f72 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
@@ -1,6 +1,8 @@
 ====
 ---- QUERY
 # Create a temporary table with 10MB strings for the following tests.
+# Need to bump memory estimate to prevent running out of memory.
+set mem_limit="1.2gb";
 create table bigstrs stored as parquet as
 select *, repeat(string_col, 10000000) as bigstr
 from functional.alltypes
@@ -55,6 +57,7 @@ int,bigint
 ====
 ---- QUERY
 # Row is too big to process in right side of hash join.
+set mem_limit="1gb";
 select straight_join atp.id, bs.id, atp.string_col
 from functional.alltypes atp
   join bigstrs bs on repeat(atp.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and atp.id = bs.id
@@ -64,6 +67,7 @@ Row of size 9.54 MB could not be materialized by HASH_JOIN_NODE (id=2). Increase
 ====
 ---- QUERY
 # Row is too big to process in right side of hash join.
+set mem_limit="1gb";
 set max_row_size=18m;
 select straight_join atp.id, bs.id, atp.string_col
 from functional.alltypes atp
@@ -148,6 +152,7 @@ Row of size 9.54 MB could not be materialized by SORT_NODE (id=1). Increase the
 ---- QUERY
 # Sort and analytic should be able to process the large strings if we increase the row
 # size.
+set mem_limit="1gb";
 set max_row_size=10m;
 SELECT id, int_col, substring(bigstr, 1, 10), substring(bigstr, 9999999, 1), rank
 FROM (
diff --git a/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
index 1730d38..dfee308 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
@@ -5,7 +5,7 @@
 set thread_reservation_limit=2;
 select count(*) from alltypes
 ---- CATCH
-row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+row_regex:.*Rejected query from pool .*: thread reservation on backend '.*'
  is greater than the THREAD_RESERVATION_LIMIT query option value: 3 > 2\.
 ====
 ---- QUERY
@@ -41,7 +41,7 @@ set thread_reservation_limit=3;
 set mt_dop=4;
 select count(*) from alltypes
 ---- CATCH
-row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+row_regex:.*Rejected query from pool .*: thread reservation on backend '.*'
  is greater than the THREAD_RESERVATION_LIMIT query option value: 5 > 3\.
 ====
 ---- QUERY
@@ -50,7 +50,7 @@ row_regex:.*Rejected query from pool default-pool: thread reservation on backend
 set thread_reservation_aggregate_limit=3;
 select count(*) from alltypes
 ---- CATCH
-row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+row_regex:.*Rejected query from pool .*: sum of thread reservations across
  all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
  value: [0-9]+ > 3\.
 ====
@@ -70,7 +70,7 @@ BIGINT
 set thread_reservation_aggregate_limit=3;
 select count(*) from tpch_parquet.orders
 ---- CATCH
-row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+row_regex:.*Rejected query from pool .*: sum of thread reservations across
  all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
  value: [0-9]+ > 3\.
 ====
@@ -119,6 +119,6 @@ with
  c512 as (select * from c256 union select * from c256)
 select * from c512 union select * from c512;
 ---- CATCH
-row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+row_regex:.*Rejected query from pool .*: thread reservation on backend '.*'
  is greater than the THREAD_RESERVATION_LIMIT query option value: [0-9]* > 3000\.
 ====
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 17ee7d8..173a272 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -86,6 +86,7 @@ class TestInsertQueries(ImpalaTestSuite):
       pytest.skip("Test unreasonably slow with JNI checking.")
     table_name = unique_database + ".insert_largestring"
 
+    self.client.set_configuration_option("mem_limit", "4gb")
     file_format = vector.get_value('table_format').file_format
     if file_format == "parquet":
       stored_as = file_format
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 463f7a5..b1035ab 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -385,7 +385,10 @@ class TestObservability(ImpalaTestSuite):
         r'Last row fetched:',
         r'Released admission control resources:']
     query = "select * from functional.alltypes"
-    runtime_profile = self.execute_query(query).runtime_profile
+    # Use no-limits pool so that it cannot get queued in admission control (which would
+    # add an extra event to the above timeline).
+    query_opts = {'request_pool': 'root.no-limits'}
+    runtime_profile = self.execute_query(query, query_opts).runtime_profile
     self.__verify_profile_event_sequence(event_regexes, runtime_profile)
 
   def test_query_profile_contains_instance_events(self):
diff --git a/tests/query_test/test_query_mem_limit.py b/tests/query_test/test_query_mem_limit.py
index 97d3ae7..f86412b 100644
--- a/tests/query_test/test_query_mem_limit.py
+++ b/tests/query_test/test_query_mem_limit.py
@@ -94,6 +94,8 @@ class TestQueryMemLimit(ImpalaTestSuite):
     mem_limit = copy(vector.get_value('mem_limit'))
     exec_options = copy(vector.get_value('exec_option'))
     exec_options['mem_limit'] = mem_limit
+    # Send to the no-limits pool so that no memory limits apply.
+    exec_options['request_pool'] = "root.no-limits"
     query = vector.get_value('query')
     table_format = vector.get_value('table_format')
     if mem_limit in["0", "-1"] or self.PASS_REGEX.match(mem_limit):
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 3cb5bbc..dd1981f 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -533,33 +533,43 @@ class TestWebPage(ImpalaTestSuite):
     # without the pool_name search string.
     self.client.execute("select 1")
     response_json = self.__fetch_resource_pools_json()
-    assert response_json[0]['pool_name'] == "default-pool"
 
-    response_json = self.__fetch_resource_pools_json("default-pool")
-    assert response_json[0]['pool_name'] == "default-pool"
+    # Find the default pool. It is either "root.default" if a fair-scheduler.xml file
+    # is provided or "default-pool" otherwise.
+    default_pool = None
+    for pool_json in response_json:
+      pool_name = pool_json['pool_name']
+      if pool_name in ['default-pool', 'root.default']:
+        default_pool = pool_name
+        break
+    assert default_pool is not None, \
+        "Expected a default pool to be present in {0}".format(response_json)
+
+    response_json = self.__fetch_resource_pools_json(default_pool)
+    assert response_json[0]['pool_name'] == default_pool
 
     # Make sure the reset informational stats endpoint works, both with and without the
     # pool_name search string.
     assert response_json[0]['total_admitted'] > 0
     self.get_and_check_status(
-      self.RESET_RESOURCE_POOL_STATS_URL + "?pool_name=default-pool",
+      self.RESET_RESOURCE_POOL_STATS_URL + "?pool_name={0}".format(default_pool),
       ports_to_test=[25000])
-    response_json = self.__fetch_resource_pools_json("default-pool")
+    response_json = self.__fetch_resource_pools_json(default_pool)
     assert response_json[0]['total_admitted'] == 0
 
     self.client.execute("select 1")
-    response_json = self.__fetch_resource_pools_json("default-pool")
+    response_json = self.__fetch_resource_pools_json(default_pool)
     assert response_json[0]['total_admitted'] > 0
     self.get_and_check_status(self.RESET_RESOURCE_POOL_STATS_URL, ports_to_test=[25000])
-    response_json = self.__fetch_resource_pools_json("default-pool")
+    response_json = self.__fetch_resource_pools_json(default_pool)
     pool_config = response_json[0]
     assert pool_config['total_admitted'] == 0
 
     # check that metrics exist
-    assert pool_config['max_query_mem_limit'] == 0
-    assert pool_config['min_query_mem_limit'] == 0
-    assert pool_config['max_running_queries_multiple'] == 0
-    assert pool_config['max_memory_multiple'] == 0
+    assert 'max_query_mem_limit' in pool_config
+    assert 'min_query_mem_limit' in pool_config
+    assert 'max_running_queries_multiple' in pool_config
+    assert 'max_memory_multiple' in pool_config
     assert 'clamp_mem_limit_query_option' in pool_config
     assert 'max_running_queries_derived' in pool_config
     assert 'max_queued_queries_derived' in pool_config
@@ -577,7 +587,6 @@ class TestWebPage(ImpalaTestSuite):
     assert len(responses) == 1
     response_json = json.loads(responses[0].text)
     assert 'resource_pools' in response_json
-    assert len(response_json['resource_pools']) == 1
     return response_json['resource_pools']
 
   @SkipIfBuildType.remote
diff --git a/www/admission_controller.tmpl b/www/admission_controller.tmpl
index 848d4ed..6b88401 100644
--- a/www/admission_controller.tmpl
+++ b/www/admission_controller.tmpl
@@ -32,6 +32,7 @@ Example of json received from the impala server
             "pool_max_mem_resources": 10485760,
             "pool_max_requests": 10,
             "pool_max_queued": 10,
+            "pool_queue_timeout": 60000,
             "max_query_mem_limit": 0,
             "min_query_mem_limit": 0,
             "clamp_mem_limit_query_option": true,
@@ -245,6 +246,10 @@ Time since last statestore update containing admission control topic state (ms):
       <td>{{pool_max_queued}}</td>
     </tr>
     <tr>
+      <td>Queue Timeout (ms)</td>
+      <td>{{pool_queue_timeout}}</td>
+    </tr>
+    <tr>
       <td><b>Min</b> Query MEM_LIMIT range</td>
       <td class='memory'>{{min_query_mem_limit}}</td>
     </tr>


[impala] 02/02: IMPALA-8928: Add MEM_LIMIT_EXECUTORS query option

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 104a454d3e88d96ec28837e1032de8b292daa84d
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Sep 6 12:54:32 2019 -0700

    IMPALA-8928: Add MEM_LIMIT_EXECUTORS query option
    
    This developer only option provides a way to set the memory limit for
    only the executors which would be useful for writing tests in the
    future.
    
    Testing:
    - Added a simple sanity check
    
    Change-Id: I20dfd4e8fc7ffd9130db9f942efa78965c724e18
    Reviewed-on: http://gerrit.cloudera.org:8080/14294
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/query-schedule.cc               | 19 +++++++++++++------
 be/src/service/query-options-test.cc              |  1 +
 be/src/service/query-options.cc                   |  8 ++++++++
 be/src/service/query-options.h                    |  5 +++--
 common/thrift/ImpalaInternalService.thrift        |  3 +++
 common/thrift/ImpalaService.thrift                |  8 ++++++++
 tests/custom_cluster/test_admission_controller.py | 19 ++++++++++++++++++-
 7 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 86fe8fd..a86be2b 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -279,14 +279,14 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
 
   per_backend_mem_to_admit_ = 0;
   coord_backend_mem_to_admit_ = 0;
-  bool has_query_option = false;
+  bool is_mem_limit_set = false;
   if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
     per_backend_mem_to_admit_ = query_options().mem_limit;
     coord_backend_mem_to_admit_ = query_options().mem_limit;
-    has_query_option = true;
+    is_mem_limit_set = true;
   }
 
-  if (!has_query_option) {
+  if (!is_mem_limit_set) {
     per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
     coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
         GetDedicatedCoordMemoryEstimate() :
@@ -302,11 +302,11 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     }
   }
 
-  if (!has_query_option || pool_cfg.clamp_mem_limit_query_option) {
+  if (!is_mem_limit_set || pool_cfg.clamp_mem_limit_query_option) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
-      if (!use_dedicated_coord_estimates || has_query_option) {
+      if (!use_dedicated_coord_estimates || is_mem_limit_set) {
         // The minimum mem limit option does not apply to dedicated coordinators -
         // this would result in over-reserving of memory. Treat coordinator and
         // executor mem limits the same if the query option was explicitly set.
@@ -332,13 +332,20 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     per_backend_mem_to_admit_ = 0;
   }
 
-  if (mimic_old_behaviour && !has_query_option) {
+  if (mimic_old_behaviour && !is_mem_limit_set) {
     per_backend_mem_limit_ = -1;
     coord_backend_mem_limit_ = -1;
   } else {
     per_backend_mem_limit_ = per_backend_mem_to_admit_;
     coord_backend_mem_limit_ = coord_backend_mem_to_admit_;
   }
+
+  // Finally, enforce the MEM_LIMIT_EXECUTORS query option if MEM_LIMIT is not specified.
+  if (!is_mem_limit_set && query_options().__isset.mem_limit_executors
+      && query_options().mem_limit_executors > 0) {
+    per_backend_mem_to_admit_ = query_options().mem_limit_executors;
+    per_backend_mem_limit_ = per_backend_mem_to_admit_;
+  }
 }
 
 void QuerySchedule::set_executor_group(string executor_group) {
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 30d7791..e587103 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -150,6 +150,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(mem_limit_executors), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index cd7007b..5d1ab07 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -899,6 +899,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
             parquet_object_store_split_size);
         break;
       }
+      case TImpalaQueryOptions::MEM_LIMIT_EXECUTORS: {
+        // Parse the mem limit spec and validate it.
+        int64_t bytes_limit;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "query memory limit for executors", &bytes_limit));
+        query_options->__set_mem_limit_executors(bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index baabd6a..01112ad 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_OBJECT_STORE_SPLIT_SIZE + 1);\
+      TImpalaQueryOptions::MEM_LIMIT_EXECUTORS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -191,7 +191,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
-      TQueryOptionLevel::ADVANCED)
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index d52e2ca..48e89fc 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -400,6 +400,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   96: optional i64 parquet_object_store_split_size = 268435456;
+
+  // See comment in ImpalaService.thrift
+  97: optional i64 mem_limit_executors = 0
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 8ba1066..2006f97 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -475,6 +475,14 @@ enum TImpalaQueryOptions {
   // Parquet files written by Impala (Impala writes Parquet files with a single row
   // group per file). Must be >= 1 MB.
   PARQUET_OBJECT_STORE_SPLIT_SIZE = 95
+
+  // For testing purposes only. A per executor approximate limit on the memory consumption
+  // of this query. Only applied if MEM_LIMIT is not specified.
+  // unspecified or a limit of 0 means no limit;
+  // Otherwise specified either as:
+  // a) an int (= number of bytes);
+  // b) a float followed by "M" (MB) or "G" (GB)
+  MEM_LIMIT_EXECUTORS = 96
 }
 
 // The summary of a DML statement.
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 4698e77..27c3182 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -630,7 +630,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     """Planner tests to add coverage for coordinator estimates when using dedicated
     coordinators. Also includes coverage for verifying cluster memory admitted."""
     vector_copy = copy(vector)
-    exec_options = vector.get_value('exec_option')
+    exec_options = vector_copy.get_value('exec_option')
     # Remove num_nodes from the options to allow test case runner to set it in one of
     # the test cases.
     del exec_options['num_nodes']
@@ -638,6 +638,23 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.run_test_case('QueryTest/dedicated-coord-mem-estimates', vector_copy,
                        unique_database)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2)
+  def test_mem_limit_executors(self, vector, unique_database):
+    """Verify that the query option mem_limit_executors is only enforced on the
+    executors."""
+    expected_exec_mem_limit = "999999999"
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    self.client.set_configuration({"MEM_LIMIT_EXECUTORS": expected_exec_mem_limit})
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+    assert expected_mem_limits['executor'] > expected_mem_limits[
+      'coordinator'], expected_mem_limits
+    assert expected_mem_limits['executor'] == float(
+      expected_exec_mem_limit), expected_mem_limits
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,