You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2019/02/12 17:30:57 UTC
[impala] 03/03: IMPALA-6035: Add query options to limit thread
reservation
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 22fb381503c713cbbe431fa059968b5c1dab9ec5
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu May 31 16:25:26 2018 -0700
IMPALA-6035: Add query options to limit thread reservation
Adds two options: THREAD_RESERVATION_LIMIT and
THREAD_RESERVATION_AGGREGATE_LIMIT, which are both enforced by admission
control based on planner resource requirements and the schedule. The
mechanism used is the same as the minimum reservation checks.
THREAD_RESERVATION_LIMIT limits the total number of reserved threads in
fragments scheduled on a single backend.
THREAD_RESERVATION_AGGREGATE_LIMIT limits the sum of reserved threads
across all fragments.
This also slightly improves the minimum reservation error message to
include the host name.
Testing:
Added end-to-end tests that exercise the code paths.
Ran core tests.
Change-Id: I5b5bbbdad5cd6b24442eb6c99a4d38c2ad710007
Reviewed-on: http://gerrit.cloudera.org:8080/10365
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/12429
Reviewed-by: Quanlong Huang <hu...@gmail.com>
---
be/src/scheduling/admission-controller.cc | 68 ++++++++++----
be/src/scheduling/query-schedule.h | 14 ++-
be/src/scheduling/scheduler.cc | 1 +
be/src/service/query-options-test.cc | 2 +
be/src/service/query-options.cc | 20 ++++
be/src/service/query-options.h | 6 +-
common/thrift/ImpalaInternalService.thrift | 6 ++
common/thrift/ImpalaService.thrift | 9 ++
.../admission-reject-min-reservation.test | 5 +-
.../queries/QueryTest/runtime_row_filters.test | 8 +-
.../queries/QueryTest/thread-limits.test | 104 +++++++++++++++++++++
tests/query_test/test_resource_limits.py | 40 ++++++++
12 files changed, 254 insertions(+), 29 deletions(-)
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ce6a82c..f94d454 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -120,8 +120,8 @@ const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
"plan. See the query profile for more information about the per-node memory "
"requirements.";
const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
- "minimum memory reservation is greater than memory available to the query "
- "for buffer reservations. Increase the buffer_pool_limit to $0. See the query "
+ "minimum memory reservation on backend '$0' is greater than memory available to the "
+ "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
"profile for more information about the per-node memory requirements.";
const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
"minimum memory reservation needed is greater than pool max mem resources. Pool "
@@ -140,6 +140,12 @@ const string REASON_REQ_OVER_POOL_MEM =
const string REASON_REQ_OVER_NODE_MEM =
"request memory needed $0 per node is greater than process mem limit $1 of $2.\n\n"
"Use the MEM_LIMIT query option to indicate how much memory is required per node.";
+const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
+ "thread reservation on backend '$0' is greater than the THREAD_RESERVATION_LIMIT "
+ "query option value: $1 > $2.";
+const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
+ "sum of thread reservations across all $0 backends is greater than the "
+ "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
// Queue decision details
// $0 = num running queries, $1 = num queries limit
@@ -406,17 +412,24 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
// the checks isn't particularly important, though some thought was given to ordering
// them in a way that might make the sense for a user.
- // Compute the max (over all backends) min_mem_reservation_bytes, the cluster total
- // (across all backends) min_mem_reservation_bytes and the min (over all backends)
+ // Compute the max (over all backends) and cluster total (across all backends) for
+ // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
// min_proc_mem_limit.
- int64_t max_min_mem_reservation_bytes = -1;
+ pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1);
int64_t cluster_min_mem_reservation_bytes = 0;
+ pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0);
pair<const TNetworkAddress*, int64_t> min_proc_mem_limit(
nullptr, std::numeric_limits<int64_t>::max());
+ int64_t cluster_thread_reservation = 0;
for (const auto& e : schedule.per_backend_exec_params()) {
cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
- if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) {
- max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes;
+ if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+ largest_min_mem_reservation =
+ make_pair(&e.first, e.second.min_mem_reservation_bytes);
+ }
+ cluster_thread_reservation += e.second.thread_reservation;
+ if (e.second.thread_reservation > max_thread_reservation.second) {
+ max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
}
if (e.second.proc_mem_limit < min_proc_mem_limit.second) {
min_proc_mem_limit.first = &e.first;
@@ -425,27 +438,46 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
}
// Checks related to the min buffer reservation against configured query memory limits:
- if (schedule.query_options().__isset.buffer_pool_limit
- && schedule.query_options().buffer_pool_limit > 0) {
- if (max_min_mem_reservation_bytes > schedule.query_options().buffer_pool_limit) {
+ const TQueryOptions& query_opts = schedule.query_options();
+ if (query_opts.__isset.buffer_pool_limit && query_opts.buffer_pool_limit > 0) {
+ if (largest_min_mem_reservation.second > query_opts.buffer_pool_limit) {
*rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION,
- PrintBytes(max_min_mem_reservation_bytes));
+ TNetworkAddressToString(*largest_min_mem_reservation.first),
+ PrintBytes(largest_min_mem_reservation.second));
return true;
}
- } else if (schedule.query_options().__isset.mem_limit
- && schedule.query_options().mem_limit > 0) {
- const int64_t mem_limit = schedule.query_options().mem_limit;
+ } else if (query_opts.__isset.mem_limit && query_opts.mem_limit > 0) {
+ // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit.
+ const int64_t mem_limit = query_opts.mem_limit;
const int64_t max_reservation =
ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
- if (max_min_mem_reservation_bytes > max_reservation) {
- const int64_t required_mem_limit =
- ReservationUtil::GetMinMemLimitFromReservation(max_min_mem_reservation_bytes);
+ if (largest_min_mem_reservation.second > max_reservation) {
+ const int64_t required_mem_limit = ReservationUtil::GetMinMemLimitFromReservation(
+ largest_min_mem_reservation.second);
*rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
- PrintBytes(max_min_mem_reservation_bytes), PrintBytes(required_mem_limit));
+ PrintBytes(largest_min_mem_reservation.second), PrintBytes(required_mem_limit));
return true;
}
}
+ // Check thread reservation limits.
+ if (query_opts.__isset.thread_reservation_limit
+ && query_opts.thread_reservation_limit > 0
+ && max_thread_reservation.second > query_opts.thread_reservation_limit) {
+ *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED,
+ TNetworkAddressToString(*max_thread_reservation.first),
+ max_thread_reservation.second, query_opts.thread_reservation_limit);
+ return true;
+ }
+ if (query_opts.__isset.thread_reservation_aggregate_limit
+ && query_opts.thread_reservation_aggregate_limit > 0
+ && cluster_thread_reservation > query_opts.thread_reservation_aggregate_limit) {
+ *rejection_reason = Substitute(REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED,
+ schedule.per_backend_exec_params().size(), cluster_thread_reservation,
+ query_opts.thread_reservation_aggregate_limit);
+ return true;
+ }
+
// Checks related to pool max_requests:
if (pool_cfg.max_requests == 0) {
*rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 870c970..b43cc7b 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -46,7 +46,8 @@ typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
FragmentScanRangeAssignment;
/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
-/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to a BackendState.
+/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
+/// AdmissionController and a BackendState.
struct BackendExecParams {
/// The fragment instance params assigned to this backend. All instances of a
/// particular fragment are contiguous in this vector. Query lifetime;
@@ -58,17 +59,22 @@ struct BackendExecParams {
// concurrently-executing operators at any point in query execution. It may be less
// than the initial reservation total claims (below) if execution of some operators
// never overlaps, which allows reuse of reservations.
- int64_t min_mem_reservation_bytes;
+ int64_t min_mem_reservation_bytes = 0;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in instance_params. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
- int64_t initial_mem_reservation_total_claims;
+ int64_t initial_mem_reservation_total_claims = 0;
+
+ // Total thread reservation for fragment instances scheduled on this backend. This is
+ // the peak number of required threads that may be required by the
+ // concurrently-executing fragment instances at any point in query execution.
+ int64_t thread_reservation = 0;
// The process memory limit of this backend. Obtained from the scheduler's executors
// configuration which is updated by membership updates from the statestore.
- int64_t proc_mem_limit;
+ int64_t proc_mem_limit = 0;
};
/// map from an impalad host address to the list of assigned fragment instance params.
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 0c9d800..5a67a74 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -783,6 +783,7 @@ void Scheduler::ComputeBackendExecParams(
be_params.min_mem_reservation_bytes += f.fragment.min_mem_reservation_bytes;
be_params.initial_mem_reservation_total_claims +=
f.fragment.initial_mem_reservation_total_claims;
+ be_params.thread_reservation += f.fragment.thread_reservation;
}
}
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index ed8d986..cb73c9a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -230,6 +230,8 @@ TEST(QueryOptions, SetIntOptions) {
{MAKE_OPTIONDEF(batch_size), {0, 65536}},
{MAKE_OPTIONDEF(query_timeout_s), {0, I32_MAX}},
{MAKE_OPTIONDEF(exec_time_limit_s), {0, I32_MAX}},
+ {MAKE_OPTIONDEF(thread_reservation_limit), {-1, I32_MAX}},
+ {MAKE_OPTIONDEF(thread_reservation_aggregate_limit), {-1, I32_MAX}},
};
for (const auto& test_case : case_set) {
const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index a0b7227..08b19c7 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -673,6 +673,26 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_max_mem_estimate_for_admission(bytes_limit);
break;
}
+ case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT:
+ case TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT: {
+ // Parsing logic is identical for these two options.
+ StringParser::ParseResult status;
+ int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status);
+ if (status != StringParser::PARSE_SUCCESS) {
+ return Status(Substitute("Invalid thread count: '$0'.", value));
+ }
+ if (val < -1) {
+ return Status(Substitute("Invalid thread count: '$0'. "
+ "Only -1 and non-negative values are allowed.", val));
+ }
+ if (option == TImpalaQueryOptions::THREAD_RESERVATION_LIMIT) {
+ query_options->__set_thread_reservation_limit(val);
+ } else {
+ DCHECK_EQ(option, TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT);
+ query_options->__set_thread_reservation_aggregate_limit(val);
+ }
+ break;
+ }
default:
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
// when we add a new query option.
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 24eaed2..dafa0a0 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION + 1);\
+ TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,\
TQueryOptionLevel::DEPRECATED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
@@ -137,6 +137,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\
TQueryOptionLevel::ADVANCED)\
+ QUERY_OPT_FN(thread_reservation_limit, THREAD_RESERVATION_LIMIT,\
+ TQueryOptionLevel::REGULAR)\
+ QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\
+ TQueryOptionLevel::REGULAR)\
;
/// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 464fe87..2c00090 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -311,6 +311,12 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift.
65: optional i64 max_mem_estimate_for_admission = 0;
+
+ // See comment in ImpalaService.thrift.
+ 66: optional i32 thread_reservation_limit = 0;
+
+ // See comment in ImpalaService.thrift.
+ 67: optional i32 thread_reservation_aggregate_limit = 0;
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index cdc5bd0..7b6afef 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -322,6 +322,15 @@ enum TImpalaQueryOptions {
// workaround if the planner's memory estimate is too high and prevents a runnable
// query from being admitted. 0 or -1 means this has no effect. Defaults to 0.
MAX_MEM_ESTIMATE_FOR_ADMISSION,
+
+ // Admission control will reject queries when the number of reserved threads per backend
+ // for the query exceeds this number. 0 or -1 means this has no effect.
+ THREAD_RESERVATION_LIMIT,
+
+ // Admission control will reject queries when the total number of reserved threads
+ // across all backends for the query exceeds this number. 0 or -1 means this has no
+ // effect.
+ THREAD_RESERVATION_AGGREGATE_LIMIT,
}
// The summary of a DML statement.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index 2ddf999..e658c09 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -19,8 +19,9 @@ minimum memory reservation needed is greater than pool max mem resources.
set buffer_pool_limit=10mb;
select distinct * from functional_parquet.alltypesagg
---- CATCH
-minimum memory reservation is greater than memory available to the query
- for buffer reservations. Increase the buffer_pool_limit to 68.09 MB.
+row_regex:.*minimum memory reservation on backend '.*'
+ is greater than memory available to the query
+ for buffer reservations\. Increase the buffer_pool_limit to 68.09 MB\.
====
---- QUERY
set mem_limit=1024;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index d16e9c0..fe61741 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -401,10 +401,10 @@ select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
on a.month = b.id and b.int_col = -3
---- RESULTS
---- CATCH
-Rejected query from pool default-pool: minimum memory reservation is
- greater than memory available to the query for buffer reservations. Increase
- the buffer_pool_limit to 290.17 MB. See the query profile for more information
- about the per-node memory requirements.
+row_regex:.*Rejected query from pool default-pool: minimum memory reservation on
+ backend '.*' is greater than memory available to the query for buffer reservations\.
+ Increase the buffer_pool_limit to 290.17 MB\. See the query profile for more information
+ about the per-node memory requirements\.
====
---- QUERY
# Confirm that with broadcast join, memory limit is not hit.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
new file mode 100644
index 0000000..ad67c70
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
@@ -0,0 +1,104 @@
+====
+---- QUERY
+# Test per-backend limit. The coordinator will get 2 fragments + 1 scanner thread
+# scheduled on it.
+set thread_reservation_limit=2;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+ is greater than the THREAD_RESERVATION_LIMIT query option value: 3 > 2\.
+====
+---- QUERY
+# Test the boundary case where the thread reservation is exactly the required threads.
+set thread_reservation_limit=3;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# Zero means no limit.
+set thread_reservation_limit=0;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# -1 means no limit.
+set thread_reservation_limit=-1;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# MT_DOP is factored into reservation.
+set thread_reservation_limit=3;
+set mt_dop=4;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+ is greater than the THREAD_RESERVATION_LIMIT query option value: 5 > 3\.
+====
+---- QUERY
+# Higher aggregate limit can reject based on sum of total threads. Assume >= 2 impalads
+# with scan ranges plus the coordinator fragment.
+set thread_reservation_aggregate_limit=3;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+ all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
+ value: [0-9]+ > 3\.
+====
+---- QUERY
+# tpch_parquet.nation has only one file, which means only one instance of the scan fragment,
+# which means it only has 3 aggregate threads.
+set thread_reservation_aggregate_limit=3;
+select count(*) from tpch_parquet.nation
+---- TYPES
+BIGINT
+---- RESULTS
+25
+====
+---- QUERY
+# tpch_parquet.orders has two files, which means only more instances of the scan fragment,
+# which means it has more than 3 aggregate threads, assuming at least two impalads.
+set thread_reservation_aggregate_limit=3;
+select count(*) from tpch_parquet.orders
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+ all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
+ value: [0-9]+ > 3\.
+====
+---- QUERY
+# Running on a single impalad gets us under the aggregate limit.
+set num_nodes=1;
+set thread_reservation_aggregate_limit=3;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# 0 means no limit.
+set thread_reservation_aggregate_limit=0;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# -1 means no limit.
+set thread_reservation_aggregate_limit=-1;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py
new file mode 100644
index 0000000..c0fc7b5
--- /dev/null
+++ b/tests/query_test/test_resource_limits.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfLocal
+from tests.common.test_dimensions import create_parquet_dimension
+
+
+class TestResourceLimits(ImpalaTestSuite):
+ """Test resource limit functionality."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestResourceLimits, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_dimension(
+ create_parquet_dimension(cls.get_workload()))
+
+ @SkipIfLocal.multiple_impalad
+ def test_thread_limits(self, vector):
+ # Remove option from vector to allow test file to override it per query.
+ del vector.get_value('exec_option')['num_nodes']
+ self.run_test_case('QueryTest/thread-limits', vector)