You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2019/02/12 17:30:57 UTC

[impala] 03/03: IMPALA-6035: Add query options to limit thread reservation

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 22fb381503c713cbbe431fa059968b5c1dab9ec5
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu May 31 16:25:26 2018 -0700

    IMPALA-6035: Add query options to limit thread reservation
    
    Adds two options: THREAD_RESERVATION_LIMIT and
    THREAD_RESERVATION_AGGREGATE_LIMIT, which are both enforced by admission
    control based on planner resource requirements and the schedule. The
    mechanism used is the same as the minimum reservation checks.
    
    THREAD_RESERVATION_LIMIT limits the total number of reserved threads in
    fragments scheduled on a single backend.
    THREAD_RESERVATION_AGGREGATE_LIMIT limits the sum of reserved threads
    across all fragments.
    
    This also slightly improves the minimum reservation error message to
    include the host name.
    
    Testing:
    Added end-to-end tests that exercise the code paths.
    
    Ran core tests.
    
    Change-Id: I5b5bbbdad5cd6b24442eb6c99a4d38c2ad710007
    Reviewed-on: http://gerrit.cloudera.org:8080/10365
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/12429
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
---
 be/src/scheduling/admission-controller.cc          |  68 ++++++++++----
 be/src/scheduling/query-schedule.h                 |  14 ++-
 be/src/scheduling/scheduler.cc                     |   1 +
 be/src/service/query-options-test.cc               |   2 +
 be/src/service/query-options.cc                    |  20 ++++
 be/src/service/query-options.h                     |   6 +-
 common/thrift/ImpalaInternalService.thrift         |   6 ++
 common/thrift/ImpalaService.thrift                 |   9 ++
 .../admission-reject-min-reservation.test          |   5 +-
 .../queries/QueryTest/runtime_row_filters.test     |   8 +-
 .../queries/QueryTest/thread-limits.test           | 104 +++++++++++++++++++++
 tests/query_test/test_resource_limits.py           |  40 ++++++++
 12 files changed, 254 insertions(+), 29 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ce6a82c..f94d454 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -120,8 +120,8 @@ const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
     "plan. See the query profile for more information about the per-node memory "
     "requirements.";
 const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
-    "minimum memory reservation is greater than memory available to the query "
-    "for buffer reservations. Increase the buffer_pool_limit to $0. See the query "
+    "minimum memory reservation on backend '$0' is greater than memory available to the "
+    "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
     "profile for more information about the per-node memory requirements.";
 const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
     "minimum memory reservation needed is greater than pool max mem resources. Pool "
@@ -140,6 +140,12 @@ const string REASON_REQ_OVER_POOL_MEM =
 const string REASON_REQ_OVER_NODE_MEM =
     "request memory needed $0 per node is greater than process mem limit $1 of $2.\n\n"
     "Use the MEM_LIMIT query option to indicate how much memory is required per node.";
+const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
+    "thread reservation on backend '$0' is greater than the THREAD_RESERVATION_LIMIT "
+    "query option value: $1 > $2.";
+const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
+    "sum of thread reservations across all $0 backends is greater than the "
+    "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
 
 // Queue decision details
 // $0 = num running queries, $1 = num queries limit
@@ -406,17 +412,24 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
   // the checks isn't particularly important, though some thought was given to ordering
   // them in a way that might make the sense for a user.
 
-  // Compute the max (over all backends) min_mem_reservation_bytes, the cluster total
-  // (across all backends) min_mem_reservation_bytes and the min (over all backends)
+  // Compute the max (over all backends) and cluster total (across all backends) for
+  // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
   // min_proc_mem_limit.
-  int64_t max_min_mem_reservation_bytes = -1;
+  pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1);
   int64_t cluster_min_mem_reservation_bytes = 0;
+  pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0);
   pair<const TNetworkAddress*, int64_t> min_proc_mem_limit(
       nullptr, std::numeric_limits<int64_t>::max());
+  int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
     cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
-    if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) {
-      max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes;
+    if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+      largest_min_mem_reservation =
+          make_pair(&e.first, e.second.min_mem_reservation_bytes);
+    }
+    cluster_thread_reservation += e.second.thread_reservation;
+    if (e.second.thread_reservation > max_thread_reservation.second) {
+      max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
     }
     if (e.second.proc_mem_limit < min_proc_mem_limit.second) {
       min_proc_mem_limit.first = &e.first;
@@ -425,27 +438,46 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
   }
 
   // Checks related to the min buffer reservation against configured query memory limits:
-  if (schedule.query_options().__isset.buffer_pool_limit
-      && schedule.query_options().buffer_pool_limit > 0) {
-    if (max_min_mem_reservation_bytes > schedule.query_options().buffer_pool_limit) {
+  const TQueryOptions& query_opts = schedule.query_options();
+  if (query_opts.__isset.buffer_pool_limit && query_opts.buffer_pool_limit > 0) {
+    if (largest_min_mem_reservation.second > query_opts.buffer_pool_limit) {
       *rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(max_min_mem_reservation_bytes));
+          TNetworkAddressToString(*largest_min_mem_reservation.first),
+          PrintBytes(largest_min_mem_reservation.second));
       return true;
     }
-  } else if (schedule.query_options().__isset.mem_limit
-      && schedule.query_options().mem_limit > 0) {
-    const int64_t mem_limit = schedule.query_options().mem_limit;
+  } else if (query_opts.__isset.mem_limit && query_opts.mem_limit > 0) {
+    // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit.
+    const int64_t mem_limit = query_opts.mem_limit;
     const int64_t max_reservation =
         ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
-    if (max_min_mem_reservation_bytes > max_reservation) {
-      const int64_t required_mem_limit =
-          ReservationUtil::GetMinMemLimitFromReservation(max_min_mem_reservation_bytes);
+    if (largest_min_mem_reservation.second > max_reservation) {
+      const int64_t required_mem_limit = ReservationUtil::GetMinMemLimitFromReservation(
+          largest_min_mem_reservation.second);
       *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(max_min_mem_reservation_bytes), PrintBytes(required_mem_limit));
+          PrintBytes(largest_min_mem_reservation.second), PrintBytes(required_mem_limit));
       return true;
     }
   }
 
+  // Check thread reservation limits.
+  if (query_opts.__isset.thread_reservation_limit
+      && query_opts.thread_reservation_limit > 0
+      && max_thread_reservation.second > query_opts.thread_reservation_limit) {
+    *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED,
+        TNetworkAddressToString(*max_thread_reservation.first),
+        max_thread_reservation.second, query_opts.thread_reservation_limit);
+    return true;
+  }
+  if (query_opts.__isset.thread_reservation_aggregate_limit
+      && query_opts.thread_reservation_aggregate_limit > 0
+      && cluster_thread_reservation > query_opts.thread_reservation_aggregate_limit) {
+    *rejection_reason = Substitute(REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED,
+        schedule.per_backend_exec_params().size(), cluster_thread_reservation,
+        query_opts.thread_reservation_aggregate_limit);
+    return true;
+  }
+
   // Checks related to pool max_requests:
   if (pool_cfg.max_requests == 0) {
     *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 870c970..b43cc7b 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -46,7 +46,8 @@ typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
 /// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
-/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to a BackendState.
+/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
+/// AdmissionController and a BackendState.
 struct BackendExecParams {
   /// The fragment instance params assigned to this backend. All instances of a
   /// particular fragment are contiguous in this vector. Query lifetime;
@@ -58,17 +59,22 @@ struct BackendExecParams {
   // concurrently-executing operators at any point in query execution. It may be less
   // than the initial reservation total claims (below) if execution of some operators
   // never overlaps, which allows reuse of reservations.
-  int64_t min_mem_reservation_bytes;
+  int64_t min_mem_reservation_bytes = 0;
 
   // Total of the initial buffer reservations that we expect to be claimed on this
   // backend for all fragment instances in instance_params. I.e. the sum over all
   // operators in all fragment instances that execute on this backend. This is used for
   // an optimization in InitialReservation. Measured in bytes.
-  int64_t initial_mem_reservation_total_claims;
+  int64_t initial_mem_reservation_total_claims = 0;
+
+  // Total thread reservation for fragment instances scheduled on this backend. This is
+  // the peak number of required threads that may be required by the
+  // concurrently-executing fragment instances at any point in query execution.
+  int64_t thread_reservation = 0;
 
   // The process memory limit of this backend. Obtained from the scheduler's executors
   // configuration which is updated by membership updates from the statestore.
-  int64_t proc_mem_limit;
+  int64_t proc_mem_limit = 0;
 };
 
 /// map from an impalad host address to the list of assigned fragment instance params.
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 0c9d800..5a67a74 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -783,6 +783,7 @@ void Scheduler::ComputeBackendExecParams(
       be_params.min_mem_reservation_bytes += f.fragment.min_mem_reservation_bytes;
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
+      be_params.thread_reservation += f.fragment.thread_reservation;
     }
   }
 
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index ed8d986..cb73c9a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -230,6 +230,8 @@ TEST(QueryOptions, SetIntOptions) {
       {MAKE_OPTIONDEF(batch_size),                     {0, 65536}},
       {MAKE_OPTIONDEF(query_timeout_s),                {0, I32_MAX}},
       {MAKE_OPTIONDEF(exec_time_limit_s),              {0, I32_MAX}},
+      {MAKE_OPTIONDEF(thread_reservation_limit),       {-1, I32_MAX}},
+      {MAKE_OPTIONDEF(thread_reservation_aggregate_limit), {-1, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index a0b7227..08b19c7 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -673,6 +673,26 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_mem_estimate_for_admission(bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT:
+      case TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT: {
+        // Parsing logic is identical for these two options.
+        StringParser::ParseResult status;
+        int val = StringParser::StringToInt<int>(value.c_str(), value.size(), &status);
+        if (status != StringParser::PARSE_SUCCESS) {
+          return Status(Substitute("Invalid thread count: '$0'.", value));
+        }
+        if (val < -1) {
+          return Status(Substitute("Invalid thread count: '$0'. "
+              "Only -1 and non-negative values are allowed.", val));
+        }
+        if (option == TImpalaQueryOptions::THREAD_RESERVATION_LIMIT) {
+          query_options->__set_thread_reservation_limit(val);
+        } else {
+          DCHECK_EQ(option, TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT);
+          query_options->__set_thread_reservation_aggregate_limit(val);
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 24eaed2..dafa0a0 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MAX_MEM_ESTIMATE_FOR_ADMISSION + 1);\
+      TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,\
       TQueryOptionLevel::DEPRECATED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
@@ -137,6 +137,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(thread_reservation_limit, THREAD_RESERVATION_LIMIT,\
+      TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\
+      TQueryOptionLevel::REGULAR)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 464fe87..2c00090 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -311,6 +311,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   65: optional i64 max_mem_estimate_for_admission = 0;
+
+  // See comment in ImpalaService.thrift.
+  66: optional i32 thread_reservation_limit = 0;
+
+  // See comment in ImpalaService.thrift.
+  67: optional i32 thread_reservation_aggregate_limit = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index cdc5bd0..7b6afef 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -322,6 +322,15 @@ enum TImpalaQueryOptions {
   // workaround if the planner's memory estimate is too high and prevents a runnable
   // query from being admitted. 0 or -1 means this has no effect. Defaults to 0.
   MAX_MEM_ESTIMATE_FOR_ADMISSION,
+
+  // Admission control will reject queries when the number of reserved threads per backend
+  // for the query exceeds this number. 0 or -1 means this has no effect.
+  THREAD_RESERVATION_LIMIT,
+
+  // Admission control will reject queries when the total number of reserved threads
+  // across all backends for the query exceeds this number. 0 or -1 means this has no
+  // effect.
+  THREAD_RESERVATION_AGGREGATE_LIMIT,
 }
 
 // The summary of a DML statement.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index 2ddf999..e658c09 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -19,8 +19,9 @@ minimum memory reservation needed is greater than pool max mem resources.
 set buffer_pool_limit=10mb;
 select distinct * from functional_parquet.alltypesagg
 ---- CATCH
-minimum memory reservation is greater than memory available to the query
- for buffer reservations. Increase the buffer_pool_limit to 68.09 MB.
+row_regex:.*minimum memory reservation on backend '.*'
+ is greater than memory available to the query
+ for buffer reservations\. Increase the buffer_pool_limit to 68.09 MB\.
 ====
 ---- QUERY
 set mem_limit=1024;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index d16e9c0..fe61741 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -401,10 +401,10 @@ select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
     on a.month = b.id and b.int_col = -3
 ---- RESULTS
 ---- CATCH
-Rejected query from pool default-pool: minimum memory reservation is
- greater than memory available to the query for buffer reservations. Increase
- the buffer_pool_limit to 290.17 MB. See the query profile for more information
- about the per-node memory requirements.
+row_regex:.*Rejected query from pool default-pool: minimum memory reservation on
+ backend '.*' is greater than memory available to the query for buffer reservations\.
+ Increase the buffer_pool_limit to 290.17 MB\. See the query profile for more information
+ about the per-node memory requirements\.
 ====
 ---- QUERY
 # Confirm that with broadcast join, memory limit is not hit.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
new file mode 100644
index 0000000..ad67c70
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/thread-limits.test
@@ -0,0 +1,104 @@
+====
+---- QUERY
+# Test per-backend limit. The coordinator will get 2 fragments + 1 scanner thread
+# scheduled on it.
+set thread_reservation_limit=2;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+ is greater than the THREAD_RESERVATION_LIMIT query option value: 3 > 2\.
+====
+---- QUERY
+# Test the boundary case where the thread reservation is exactly the required threads.
+set thread_reservation_limit=3;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# Zero means no limit.
+set thread_reservation_limit=0;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# -1 means no limit.
+set thread_reservation_limit=-1;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# MT_DOP is factored into reservation.
+set thread_reservation_limit=3;
+set mt_dop=4;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: thread reservation on backend '.*'
+ is greater than the THREAD_RESERVATION_LIMIT query option value: 5 > 3\.
+====
+---- QUERY
+# Higher aggregate limit can reject based on sum of total threads. Assume >= 2 impalads
+# with scan ranges plus the coordinator fragment.
+set thread_reservation_aggregate_limit=3;
+select count(*) from alltypes
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+ all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
+ value: [0-9]+ > 3\.
+====
+---- QUERY
+# tpch_parquet.nation has only one file, which means only one instance of the scan fragment,
+# which means it only has 3 aggregate threads.
+set thread_reservation_aggregate_limit=3;
+select count(*) from tpch_parquet.nation
+---- TYPES
+BIGINT
+---- RESULTS
+25
+====
+---- QUERY
+# tpch_parquet.orders has two files, which means only more instances of the scan fragment,
+# which means it has more than 3 aggregate threads, assuming at least two impalads.
+set thread_reservation_aggregate_limit=3;
+select count(*) from tpch_parquet.orders
+---- CATCH
+row_regex:.*Rejected query from pool default-pool: sum of thread reservations across
+ all [0-9]+ backends is greater than the THREAD_RESERVATION_AGGREGATE_LIMIT query option
+ value: [0-9]+ > 3\.
+====
+---- QUERY
+# Running on a single impalad gets us under the aggregate limit.
+set num_nodes=1;
+set thread_reservation_aggregate_limit=3;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# 0 means no limit.
+set thread_reservation_aggregate_limit=0;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
+---- QUERY
+# -1 means no limit.
+set thread_reservation_aggregate_limit=-1;
+select count(*) from alltypes
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py
new file mode 100644
index 0000000..c0fc7b5
--- /dev/null
+++ b/tests/query_test/test_resource_limits.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfLocal
+from tests.common.test_dimensions import create_parquet_dimension
+
+
+class TestResourceLimits(ImpalaTestSuite):
+  """Test resource limit functionality."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResourceLimits, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_parquet_dimension(cls.get_workload()))
+
+  @SkipIfLocal.multiple_impalad
+  def test_thread_limits(self, vector):
+    # Remove option from vector to allow test file to override it per query.
+    del vector.get_value('exec_option')['num_nodes']
+    self.run_test_case('QueryTest/thread-limits', vector)