You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/09 22:30:14 UTC

impala git commit: IMPALA-6034: Add scanned bytes limits per query

Repository: impala
Updated Branches:
  refs/heads/master 7f9a74ffc -> 3e17705ec


IMPALA-6034: Add scanned bytes limits per query

This adds support for aggregate resource limits at runtime, specified
via query options. If a query exceeds a limit it is terminated. The
checks are periodic so the query may go somewhat over the limits.

SCAN_BYTES_LIMIT is exposed as an advanced query option.

CPU_LIMIT_S is hidden as a development query option because it is flawed
- the CPU user/sys time is only updated upon thread completion, so in
many cases the limit will not take effect until well after the resources
have been used. IMPALA-7318 tracks enabling this.

Query profile is updated to include query wide and per backend metrics
for CPU and scanned bytes. Example from "select count(*) from
tpch_parquet.lineitem":

    Per Node Peak Memory Usage: tarmstrong-box:22000(289.50 KB) tarmstrong-box:22001(249.50 KB) tarmstrong-box:22002(249.50 KB)
    Per Node Bytes Read: tarmstrong-box:22000(100.00 KB) tarmstrong-box:22001(100.00 KB) tarmstrong-box:22002(100.00 KB)
    Per Node User Time: tarmstrong-box:22000(40.000ms) tarmstrong-box:22001(32.000ms) tarmstrong-box:22002(24.000ms)
    Per Node System Time: tarmstrong-box:22000(0.000ns) tarmstrong-box:22001(0.000ns) tarmstrong-box:22002(0.000ns)
     - FiltersReceived: 0 (0)
     - FinalizationTimer: 0.000ns
     - NumBackends: 3 (3)
     - NumFragmentInstances: 4 (4)
     - NumFragments: 2 (2)
     - TotalBytesRead: 300.00 KB (307200)
     - TotalCpuTime: 96.000ms

Testing:
Added tests for various permutations for CPU_LIMIT_S and
SCAN_BYTES_LIMIT

Based on a previous patch by Mostafa Mokhtar
<mm...@cloudera.com>

Change-Id: I3e85f80b70b3fce47e637e9322ed0316ee84f6a9
Reviewed-on: http://gerrit.cloudera.org:8080/11081
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3e17705e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3e17705e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3e17705e

Branch: refs/heads/master
Commit: 3e17705ecaba0b6ab9ae929e6c7c409e0b6aea1d
Parents: 7f9a74f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 6 12:35:38 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 9 22:18:01 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc     | 56 ++++++++++---
 be/src/runtime/coordinator-backend-state.h      | 23 ++---
 be/src/runtime/coordinator.cc                   | 42 ++++++++--
 be/src/runtime/coordinator.h                    | 32 +++++++
 be/src/service/impala-server.cc                 | 88 +++++++++++++++++---
 be/src/service/impala-server.h                  | 11 ++-
 be/src/service/query-options-test.cc            | 39 +++++++--
 be/src/service/query-options.cc                 | 21 ++++-
 be/src/service/query-options.h                  |  5 +-
 common/thrift/ImpalaInternalService.thrift      |  6 ++
 common/thrift/ImpalaService.thrift              |  8 ++
 .../QueryTest/query-resource-limits.test        | 75 +++++++++++++++++
 tests/query_test/test_cancellation.py           |  6 ++
 tests/query_test/test_resource_limits.py        |  4 +
 www/query_backends.tmpl                         | 10 ++-
 15 files changed, 371 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index a99acdb..2d97b54 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -210,9 +210,35 @@ Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
   return status_;
 }
 
-int64_t Coordinator::BackendState::GetPeakConsumption() {
+Coordinator::ResourceUtilization Coordinator::BackendState::ComputeResourceUtilization() {
   lock_guard<mutex> l(lock_);
-  return peak_consumption_;
+  return ComputeResourceUtilizationLocked();
+}
+
+Coordinator::ResourceUtilization
+Coordinator::BackendState::ComputeResourceUtilizationLocked() {
+  ResourceUtilization result;
+  for (const auto& entry : instance_stats_map_) {
+    RuntimeProfile* profile = entry.second->profile_;
+    ResourceUtilization instance_utilization;
+    // Update resource utilization and apply delta.
+    RuntimeProfile::Counter* user_time = profile->GetCounter("TotalThreadsUserTime");
+    if (user_time != nullptr) instance_utilization.cpu_user_ns = user_time->value();
+
+    RuntimeProfile::Counter* system_time = profile->GetCounter("TotalThreadsSysTime");
+    if (system_time != nullptr) instance_utilization.cpu_sys_ns = system_time->value();
+
+    for (RuntimeProfile::Counter* c : entry.second->bytes_read_counters_) {
+      instance_utilization.bytes_read += c->value();
+    }
+
+    RuntimeProfile::Counter* peak_mem =
+        profile->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
+    if (peak_mem != nullptr)
+      instance_utilization.peak_per_host_mem_consumption = peak_mem->value();
+    result.Merge(instance_utilization);
+  }
+  return result;
 }
 
 void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
@@ -258,11 +284,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     if (instance_stats->done_) continue;
 
     instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
-    if (instance_stats->peak_mem_counter_ != nullptr) {
-      // protect against out-of-order status updates
-      peak_consumption_ =
-        max(peak_consumption_, instance_stats->peak_mem_counter_->value());
-    }
 
     // If a query is aborted due to an error encountered by a single fragment instance,
     // all other fragment instances will report a cancelled status; make sure not to mask
@@ -445,15 +466,17 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
     RuntimeProfile::Counter* c =
         p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
     if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
+
+    RuntimeProfile::Counter* bytes_read =
+        p->GetCounter(ScanNode::BYTES_READ_COUNTER);
+    if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
   }
 
-  peak_mem_counter_ =
-      profile_->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
 }
 
 void Coordinator::BackendState::InstanceStats::Update(
-    const TFragmentInstanceExecStatus& exec_status,
-    ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
+    const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+    ProgressUpdater* scan_range_progress) {
   last_report_time_ms_ = MonotonicMillis();
   if (exec_status.done) stopwatch_.Stop();
   profile_->Update(exec_status.profile);
@@ -579,10 +602,17 @@ void Coordinator::FragmentStats::AddExecStats() {
 
 void Coordinator::BackendState::ToJson(Value* value, Document* document) {
   lock_guard<mutex> l(lock_);
+  ResourceUtilization resource_utilization = ComputeResourceUtilizationLocked();
   value->AddMember("num_instances", fragments_.size(), document->GetAllocator());
   value->AddMember("done", IsDone(), document->GetAllocator());
-  value->AddMember(
-      "peak_mem_consumption", peak_consumption_, document->GetAllocator());
+  value->AddMember("peak_per_host_mem_consumption",
+      resource_utilization.peak_per_host_mem_consumption, document->GetAllocator());
+  value->AddMember("bytes_read", resource_utilization.bytes_read,
+      document->GetAllocator());
+  value->AddMember("cpu_user_s", resource_utilization.cpu_user_ns / 1e9,
+      document->GetAllocator());
+  value->AddMember("cpu_sys_s", resource_utilization.cpu_sys_ns / 1e9,
+      document->GetAllocator());
 
   string host = TNetworkAddressToString(impalad_address());
   Value val(host.c_str(), document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index c51c16c..1154cd8 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -109,8 +109,9 @@ class Coordinator::BackendState {
   Status GetStatus(bool* is_fragment_failure = nullptr,
       TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
 
-  /// Return peak memory consumption.
-  int64_t GetPeakConsumption();
+  /// Return peak memory consumption and aggregated resource usage across all fragment
+  /// instances for this backend.
+  ResourceUtilization ComputeResourceUtilization();
 
   /// Merge the accumulated error log into 'merged'.
   void MergeErrorLog(ErrorLogMap* merged);
@@ -147,8 +148,9 @@ class Coordinator::BackendState {
     /// Updates 'this' with exec_status, the fragment instances' TExecStats in
     /// exec_summary, and 'progress_updater' with the number of newly completed scan
     /// ranges. Also updates the instance's avg profile.
-    void Update(const TFragmentInstanceExecStatus& exec_status,
-        ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
+    /// Caller must hold BackendState::lock_.
+    void Update(const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+        ProgressUpdater* scan_range_progress);
 
     int per_fragment_instance_idx() const {
       return exec_params_.per_fragment_instance_idx;
@@ -191,14 +193,14 @@ class Coordinator::BackendState {
     /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
     std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
 
-    /// PER_HOST_PEAK_MEM_COUNTER
-    RuntimeProfile::Counter* peak_mem_counter_ = nullptr;
+    /// Collection of BYTES_READ_COUNTERs of all scan nodes in this fragment instance.
+    std::vector<RuntimeProfile::Counter*> bytes_read_counters_;
 
     /// The current state of this fragment instance's execution. This gets serialized in
     /// ToJson() and is displayed in the debug webpages.
     TFInstanceExecState::type current_state_ = TFInstanceExecState::WAITING_FOR_EXEC;
 
-    /// Extracts scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
+    /// Extracts scan_ranges_complete_counters_ and  bytes_read_counters_ from profile_.
     void InitCounters();
   };
 
@@ -250,10 +252,6 @@ class Coordinator::BackendState {
   /// successful.
   bool rpc_sent_ = false;
 
-  /// peak memory used for this query (value of that node's query memtracker's
-  /// peak_consumption()
-  int64_t peak_consumption_ = 0;
-
   /// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
   int64_t last_report_time_ms_ = 0;
 
@@ -268,6 +266,9 @@ class Coordinator::BackendState {
 
   /// Return true if execution at this backend is done. Caller must hold lock_.
   bool IsDone() const;
+
+  /// Same as ComputeResourceUtilization() but caller must hold lock.
+  ResourceUtilization ComputeResourceUtilizationLocked();
 };
 
 /// Per fragment execution statistics.

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 8a3213e..99f0be5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -730,13 +730,35 @@ void Coordinator::ComputeQuerySummary() {
     fragment_stats->AddExecStats();
   }
 
-  stringstream info;
+  stringstream mem_info, cpu_user_info, cpu_system_info, bytes_read_info;
+  ResourceUtilization total_utilization;
   for (BackendState* backend_state: backend_states_) {
-    info << TNetworkAddressToString(backend_state->impalad_address()) << "("
-         << PrettyPrinter::Print(backend_state->GetPeakConsumption(), TUnit::BYTES)
-         << ") ";
-  }
-  query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str());
+    ResourceUtilization utilization = backend_state->ComputeResourceUtilization();
+    total_utilization.Merge(utilization);
+    string network_address = TNetworkAddressToString(
+        backend_state->impalad_address());
+    mem_info << network_address << "("
+             << PrettyPrinter::Print(utilization.peak_per_host_mem_consumption,
+                 TUnit::BYTES) << ") ";
+    bytes_read_info << network_address << "("
+                    << PrettyPrinter::Print(utilization.bytes_read, TUnit::BYTES) << ") ";
+    cpu_user_info << network_address << "("
+                  << PrettyPrinter::Print(utilization.cpu_user_ns, TUnit::TIME_NS)
+                  << ") ";
+    cpu_system_info << network_address << "("
+                    << PrettyPrinter::Print(utilization.cpu_sys_ns, TUnit::TIME_NS)
+                    << ") ";
+  }
+
+  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesRead", TUnit::BYTES),
+      total_utilization.bytes_read);
+  COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
+      total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
+
+  query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
+  query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
+  query_profile_->AddInfoString("Per Node User Time", cpu_user_info.str());
+  query_profile_->AddInfoString("Per Node System Time", cpu_system_info.str());
 }
 
 string Coordinator::GetErrorLog() {
@@ -775,6 +797,14 @@ void Coordinator::ReleaseAdmissionControlResources() {
   query_events_->MarkEvent("Released admission control resources");
 }
 
+Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
+  ResourceUtilization query_resource_utilization;
+  for (BackendState* backend_state: backend_states_) {
+    query_resource_utilization.Merge(backend_state->ComputeResourceUtilization());
+  }
+  return query_resource_utilization;
+}
+
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   shared_lock<shared_mutex> lock(filter_lock_);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0b0312c..4fb7e25 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -167,6 +167,38 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// instances running on each backend in a member named 'backend_instances'.
   void FInstanceStatsToJson(rapidjson::Document* document);
 
+  /// Struct to aggregate resource usage information at the finstance, backend and
+  /// query level.
+  struct ResourceUtilization {
+    /// Peak memory used for this query (value of the query memtracker's
+    /// peak_consumption()). At the finstance or backend level, this is the
+    /// peak value for that backend or if at the query level, this is the max
+    /// peak value from any backend.
+    int64_t peak_per_host_mem_consumption = 0;
+
+    /// Total bytes read across all scan nodes.
+    int64_t bytes_read = 0;
+
+    /// Total user cpu consumed.
+    int64_t cpu_user_ns = 0;
+
+    /// Total system cpu consumed.
+    int64_t cpu_sys_ns = 0;
+
+    /// Merge utilization from 'other' into this.
+    void Merge(const ResourceUtilization& other) {
+      peak_per_host_mem_consumption =
+          std::max(peak_per_host_mem_consumption, other.peak_per_host_mem_consumption);
+      bytes_read += other.bytes_read;
+      cpu_user_ns += other.cpu_user_ns;
+      cpu_sys_ns += other.cpu_sys_ns;
+    }
+  };
+
+  /// Aggregate resource utilization for the query (i.e. across all backends based on the
+  /// latest status reports received from those backends).
+  ResourceUtilization ComputeQueryResourceUtilization();
+
  private:
   class BackendState;
   struct FilterTarget;

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 34293f6..cc8465f 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -240,6 +240,9 @@ const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
 
+// Interval between checks for query expiration.
+const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
+
 // Work item for ImpalaServer::cancellation_thread_pool_.
 class CancellationWork {
  public:
@@ -1061,7 +1064,10 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
     idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
   }
   int32_t exec_time_limit_s = request_state->query_options().exec_time_limit_s;
-  if (idle_timeout_s > 0 || exec_time_limit_s > 0) {
+  int64_t cpu_limit_s = request_state->query_options().cpu_limit_s;
+  int64_t scan_bytes_limit = request_state->query_options().scan_bytes_limit;
+  if (idle_timeout_s > 0 || exec_time_limit_s > 0 ||
+        cpu_limit_s > 0 || scan_bytes_limit > 0) {
     lock_guard<mutex> l2(query_expiration_lock_);
     int64_t now = UnixMillis();
     if (idle_timeout_s > 0) {
@@ -1076,6 +1082,18 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
       queries_by_timestamp_.emplace(ExpirationEvent{
           now + (1000L * exec_time_limit_s), query_id, ExpirationKind::EXEC_TIME_LIMIT});
     }
+    if (cpu_limit_s > 0 || scan_bytes_limit > 0) {
+      if (cpu_limit_s > 0) {
+        VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
+                   << PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S);
+      }
+      if (scan_bytes_limit > 0) {
+        VLOG_QUERY << "Query " << PrintId(query_id) << " has scan bytes limit of "
+                   << PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES);
+      }
+      queries_by_timestamp_.emplace(ExpirationEvent{
+          now + EXPIRATION_CHECK_INTERVAL_MS, query_id, ExpirationKind::RESOURCE_LIMIT});
+    }
   }
   return Status::OK();
 }
@@ -1952,7 +1970,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
 
 [[noreturn]] void ImpalaServer::ExpireQueries() {
   while (true) {
-    // The following block accomplishes three things:
+    // The following block accomplishes four things:
     //
     // 1. Update the ordered list of queries by checking the 'idle_time' parameter in
     // client_request_state. We are able to avoid doing this for *every* query in flight
@@ -1967,6 +1985,8 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
     // 3. Compute the next time a query *might* expire, so that the sleep at the end of
     // this loop has an accurate duration to wait. If the list of queries is empty, the
     // default sleep duration is half the idle query timeout.
+    //
+    // 4. Cancel queries with CPU and scan bytes constraints if limit is exceeded
     int64_t now;
     {
       lock_guard<mutex> l(query_expiration_lock_);
@@ -1976,24 +1996,38 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
         // 'queries_by_timestamp_' is stored in ascending order of deadline so we can
         // break out of the loop and sleep as soon as we see a deadline in the future.
         if (expiration_event->deadline > now) break;
-        shared_ptr<ClientRequestState> query_state =
+        shared_ptr<ClientRequestState> crs =
             GetClientRequestState(expiration_event->query_id);
-        if (query_state == nullptr || query_state->is_expired()) {
+        if (crs == nullptr || crs->is_expired()) {
           // Query was deleted or expired already from a previous expiration event.
           expiration_event = queries_by_timestamp_.erase(expiration_event);
           continue;
         }
 
+        // Check for CPU and scanned bytes limits
+        if (expiration_event->kind == ExpirationKind::RESOURCE_LIMIT) {
+          Status resource_status = CheckResourceLimits(crs.get());
+          if (resource_status.ok()) {
+            queries_by_timestamp_.emplace(
+                ExpirationEvent{now + EXPIRATION_CHECK_INTERVAL_MS,
+                    expiration_event->query_id, ExpirationKind::RESOURCE_LIMIT});
+          } else {
+            ExpireQuery(crs.get(), resource_status);
+          }
+          expiration_event = queries_by_timestamp_.erase(expiration_event);
+          continue;
+        }
+
         // If the query time limit expired, we must cancel the query.
         if (expiration_event->kind == ExpirationKind::EXEC_TIME_LIMIT) {
-          int32_t exec_time_limit_s = query_state->query_options().exec_time_limit_s;
+          int32_t exec_time_limit_s = crs->query_options().exec_time_limit_s;
           VLOG_QUERY << "Expiring query " << PrintId(expiration_event->query_id)
                      << " due to execution time limit of " << exec_time_limit_s << "s.";
           const string& err_msg = Substitute(
               "Query $0 expired due to execution time limit of $1",
               PrintId(expiration_event->query_id),
               PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S));
-          ExpireQuery(query_state.get(), Status::Expected(err_msg));
+          ExpireQuery(crs.get(), Status::Expected(err_msg));
           expiration_event = queries_by_timestamp_.erase(expiration_event);
           continue;
         }
@@ -2003,14 +2037,14 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
         // Now check to see if the idle timeout has expired. We must check the actual
         // expiration time in case the query has updated 'last_active_ms' since the last
         // time we looked.
-        int32_t idle_timeout_s = query_state->query_options().query_timeout_s;
+        int32_t idle_timeout_s = crs->query_options().query_timeout_s;
         if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
           idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
         } else {
           // Use a non-zero timeout, if one exists
           idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
         }
-        int64_t expiration = query_state->last_active_ms() + (idle_timeout_s * 1000L);
+        int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
         if (now < expiration) {
           // If the real expiration date is in the future we may need to re-insert the
           // query's expiration event at its correct location.
@@ -2026,17 +2060,17 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
             queries_by_timestamp_.emplace(ExpirationEvent{
                 expiration, query_id, ExpirationKind::IDLE_TIMEOUT});
           }
-        } else if (!query_state->is_active()) {
+        } else if (!crs->is_active()) {
           // Otherwise time to expire this query
           VLOG_QUERY
               << "Expiring query due to client inactivity: "
               << PrintId(expiration_event->query_id) << ", last activity was at: "
-              << ToStringFromUnixMillis(query_state->last_active_ms());
+              << ToStringFromUnixMillis(crs->last_active_ms());
           const string& err_msg = Substitute(
               "Query $0 expired due to client inactivity (timeout is $1)",
               PrintId(expiration_event->query_id),
               PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
-          ExpireQuery(query_state.get(), Status::Expected(err_msg));
+          ExpireQuery(crs.get(), Status::Expected(err_msg));
           expiration_event = queries_by_timestamp_.erase(expiration_event);
         } else {
           // Iterator is moved on in every other branch.
@@ -2048,8 +2082,38 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
     // could expire is in 1s time. An existing query may expire sooner, but we are
     // comfortable with a maximum error of 1s as a trade-off for not frequently waking
     // this thread.
-    SleepForMs(1000L);
+    SleepForMs(EXPIRATION_CHECK_INTERVAL_MS);
+  }
+}
+
+Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
+  Coordinator* coord = crs->GetCoordinator();
+  // Coordinator may be null if query has not started executing, check again later.
+  if (coord == nullptr) return Status::OK();
+  Coordinator::ResourceUtilization utilization = coord->ComputeQueryResourceUtilization();
+
+  // CPU time consumed by the query so far
+  int64_t cpu_time_ns = utilization.cpu_sys_ns + utilization.cpu_user_ns;
+  int64_t cpu_limit_s = crs->query_options().cpu_limit_s;
+  int64_t cpu_limit_ns = cpu_limit_s * 1000'000'000L;
+  if (cpu_limit_ns > 0 && cpu_time_ns > cpu_limit_ns) {
+    const string& err_msg = Substitute("Query $0 terminated due to CPU limit of $1",
+        PrintId(crs->query_id()), PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S));
+    VLOG_QUERY << err_msg;
+    return Status::Expected(err_msg);
   }
+
+  int64_t scan_bytes = utilization.bytes_read;
+  int64_t scan_bytes_limit = crs->query_options().scan_bytes_limit;
+  if (scan_bytes_limit > 0 && scan_bytes > scan_bytes_limit) {
+    const string& err_msg = Substitute(
+        "Query $0 terminated due to scan bytes limit of $1", PrintId(crs->query_id()),
+        PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES));
+    VLOG_QUERY << err_msg;
+    return Status::Expected(err_msg);
+  }
+  // Query is within the resource limits, check again later.
+  return Status::OK();
 }
 
 void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8b7af09..14c33d1 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -831,6 +831,12 @@ class ImpalaServer : public ImpalaServiceIf,
   /// FLAGS_idle_query_timeout seconds.
   [[noreturn]] void ExpireQueries();
 
+  /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query
+  /// exceeded a resource limit, returns a non-OK status with information about what
+  /// limit was exceeded. Returns OK if the query will continue running and expiration
+  /// check should be rescheduled for a later time.
+  Status CheckResourceLimits(ClientRequestState* crs);
+
   /// Expire 'crs' and cancel it with status 'status'.
   void ExpireQuery(ClientRequestState* crs, const Status& status);
 
@@ -1065,7 +1071,10 @@ class ImpalaServer : public ImpalaServiceIf,
     IDLE_TIMEOUT,
     // A hard time limit on query execution. The query is cancelled if this event occurs
     // before the query finishes.
-    EXEC_TIME_LIMIT
+    EXEC_TIME_LIMIT,
+    // A hard limit on cpu and scanned bytes. The query is cancelled if this event occurs
+    // before the query finishes.
+    RESOURCE_LIMIT,
   };
 
   // Describes a query expiration event where the query identified by 'query_id' is

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index b9bda60..114d830 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -135,16 +135,17 @@ void TestByteCaseSet(TQueryOptions& options,
 TEST(QueryOptions, SetByteOptions) {
   TQueryOptions options;
   // key and its valid range: [(key, (min, max))]
-  vector<pair<OptionDef<int64_t>, Range<int64_t>>> case_set_i64 {
-      {MAKE_OPTIONDEF(mem_limit),             {-1, I64_MAX}},
+  vector<pair<OptionDef<int64_t>, Range<int64_t>>> case_set_i64{
+      {MAKE_OPTIONDEF(mem_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_scan_range_length), {-1, I64_MAX}},
-      {MAKE_OPTIONDEF(buffer_pool_limit),     {-1, I64_MAX}},
-      {MAKE_OPTIONDEF(max_row_size),          {1, ROW_SIZE_LIMIT}},
-      {MAKE_OPTIONDEF(parquet_file_size),     {-1, I32_MAX}},
+      {MAKE_OPTIONDEF(buffer_pool_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(max_row_size), {1, ROW_SIZE_LIMIT}},
+      {MAKE_OPTIONDEF(parquet_file_size), {-1, I32_MAX}},
       {MAKE_OPTIONDEF(compute_stats_min_sample_size), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
   };
-  vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32 {
+  vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
@@ -154,8 +155,7 @@ TEST(QueryOptions, SetByteOptions) {
           {8 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       {MAKE_OPTIONDEF(runtime_bloom_filter_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}
-  };
+              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}};
   TestByteCaseSet(options, case_set_i64);
   TestByteCaseSet(options, case_set_i32);
 }
@@ -245,6 +245,29 @@ TEST(QueryOptions, SetIntOptions) {
   }
 }
 
+// Test integer options. Some of them have lower/upper bounds.
+TEST(QueryOptions, SetBigIntOptions) {
+  TQueryOptions options;
+  // List of pairs of Key and its valid range
+  pair<OptionDef<int64_t>, Range<int64_t>> case_set[] {
+      {MAKE_OPTIONDEF(cpu_limit_s),  {0, I64_MAX}},
+  };
+  for (const auto& test_case : case_set) {
+    const OptionDef<int64_t>& option_def = test_case.first;
+    const Range<int64_t>& range = test_case.second;
+    auto TestOk = MakeTestOkFn(options, option_def);
+    auto TestError = MakeTestErrFn(options, option_def);
+    TestError("1M");
+    TestError("0B");
+    TestError("1%");
+    TestOk(to_string(range.lower_bound).c_str(), range.lower_bound);
+    TestOk(to_string(range.upper_bound).c_str(), range.upper_bound);
+    TestError(to_string(int64_t(range.lower_bound) - 1).c_str());
+    // 2^63 is I64_MAX + 1.
+    TestError("9223372036854775808");
+  }
+}
+
 // Test options with non regular validation rule
 TEST(QueryOptions, SetSpecialOptions) {
   // REPLICA_PREFERENCE has unsettable enum values: cache_rack(1) & disk_rack(3)

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e401553..1424896 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -674,7 +674,8 @@ Status impala::SetQueryOption(const string& key, const string& value,
           query_options->__set_kudu_read_mode(TKuduReadMode::READ_AT_SNAPSHOT);
         } else {
           return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are "
-              "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.", value));
+                                   "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.",
+              value));
         }
         break;
       }
@@ -695,6 +696,24 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_timezone(timezone);
         break;
       }
+      case TImpalaQueryOptions::SCAN_BYTES_LIMIT: {
+        int64_t bytes_limit;
+        RETURN_IF_ERROR(ParseMemValue(value, "query scan bytes limit", &bytes_limit));
+        query_options->__set_scan_bytes_limit(bytes_limit);
+        break;
+      }
+      case TImpalaQueryOptions::CPU_LIMIT_S: {
+        StringParser::ParseResult result;
+        const int64_t cpu_limit_s =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || cpu_limit_s < 0) {
+          return Status(
+              Substitute("Invalid CPU limit: '$0'. "
+                         "Only non-negative numbers are allowed.", value));
+        }
+        query_options->__set_cpu_limit_s(cpu_limit_s);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 08749b8..46cdf05 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::TIMEZONE + 1);\
+      TImpalaQueryOptions::CPU_LIMIT_S + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -140,6 +140,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 80da2a9..5367a6e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -298,6 +298,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   70: optional string timezone = ""
+
+  // See comment in ImpalaService.thrift.
+  71: optional i64 scan_bytes_limit = 0;
+
+  // See comment in ImpalaService.thrift.
+  72: optional i64 cpu_limit_s = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 5d8260e..05a1431 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -331,6 +331,14 @@ enum TImpalaQueryOptions {
   // The timezone used in UTC<->localtime conversions. The default is the OS's timezone
   // at the coordinator, which can be overridden by environment variable $TZ.
   TIMEZONE,
+
+  // Scan bytes limit, after which a query will be terminated with an error.
+  SCAN_BYTES_LIMIT,
+
+  // CPU time limit in seconds, after which a query will be terminated with an error.
+  // Note that until IMPALA-7318 is fixed, CPU usage can be very stale and this may not
+  // terminate queries soon enough.
+  CPU_LIMIT_S,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
new file mode 100644
index 0000000..e3cd401
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
@@ -0,0 +1,75 @@
+====
+---- QUERY
+# Query should succeed.
+set CPU_LIMIT_S=10000;
+set SCAN_BYTES_LIMIT="10G";
+select id from functional.alltypessmall  order by 1 limit 1
+---- TYPES
+INT
+---- RESULTS
+0
+====
+---- QUERY
+# Query should fail due to exceeding scan bytes limit.
+set SCAN_BYTES_LIMIT="100M";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+            l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+            l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+            having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to scan bytes limit of 100.00 M.*
+====
+---- QUERY
+# Query should fail due to CPU time limit.
+set CPU_LIMIT_S=1;
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+            l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+            l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+            having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to CPU limit of 1s000ms.*
+====
+---- QUERY
+# Query should fail due to CPU time limit.
+set CPU_LIMIT_S=1;
+set SCAN_BYTES_LIMIT="100G";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+            l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+            l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+            having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to CPU limit of 1s000ms.*
+====
+---- QUERY
+# Query should fail due to exceeding time limit.
+set EXEC_TIME_LIMIT_S=2;
+set CPU_LIMIT_S=10000;
+set SCAN_BYTES_LIMIT="100G";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+            l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+            l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+            having count(*) = 99
+---- CATCH
+row_regex:.*expired due to execution time limit of 2s000ms.*
+====
+---- QUERY
+# Bytes limit not enforced for Kudu yet.
+set SCAN_BYTES_LIMIT="1k";
+select min(l_orderkey) from tpch_kudu.lineitem
+---- TYPES
+BIGINT
+---- RESULTS
+1
+====
+---- QUERY
+# Bytes limit enforced for HBase.
+# Add a sleep to slow down query and avoid race with bytes check.
+set SCAN_BYTES_LIMIT="1k";
+select count(*)
+from (
+  select distinct *
+  from functional_hbase.alltypesagg
+  where sleep(100) = id) v
+---- CATCH
+row_regex:.*terminated due to scan bytes limit of 1.00 K.*
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/tests/query_test/test_cancellation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index a63a628..ecf3126 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -53,6 +53,9 @@ WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
 # Verify that failed CancelFInstances() RPCs don't lead to hung queries
 FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL']
 
+# Test cancelling when there is a resource limit.
+CPU_LIMIT_S = [0, 100000]
+
 # Verify close rpc running concurrently with fetch rpc. The two cases verify:
 # False: close and fetch rpc run concurrently.
 # True: cancel rpc is enough to ensure that the fetch rpc is unblocked.
@@ -85,6 +88,8 @@ class TestCancellation(ImpalaTestSuite):
         ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('buffer_pool_limit', 0))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S))
 
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('query_type') != 'CTAS' or (\
@@ -138,6 +143,7 @@ class TestCancellation(ImpalaTestSuite):
 
     vector.get_value('exec_option')['buffer_pool_limit'] =\
         vector.get_value('buffer_pool_limit')
+    vector.get_value('exec_option')['cpu_limit_s'] = vector.get_value('cpu_limit_s')
 
     # Execute the query multiple times, cancelling it each time.
     for i in xrange(NUM_CANCELATION_ITERATIONS):

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/tests/query_test/test_resource_limits.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py
index 58e2ce6..be0895b 100644
--- a/tests/query_test/test_resource_limits.py
+++ b/tests/query_test/test_resource_limits.py
@@ -38,3 +38,7 @@ class TestResourceLimits(ImpalaTestSuite):
     # Remove option from vector to allow test file to override it per query.
     del vector.get_value('exec_option')['num_nodes']
     self.run_test_case('QueryTest/thread-limits', vector)
+
+  @SkipIfLocal.multiple_impalad
+  def test_resource_limits(self, vector):
+    self.run_test_case('QueryTest/query-resource-limits', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/www/query_backends.tmpl
----------------------------------------------------------------------
diff --git a/www/query_backends.tmpl b/www/query_backends.tmpl
index c1833c6..fedae37 100644
--- a/www/query_backends.tmpl
+++ b/www/query_backends.tmpl
@@ -36,7 +36,10 @@ under the License.
       <th>Num. instances</th>
       <th>Num. remaining instances</th>
       <th>Done</th>
-      <th>Peak mem. consumption</th>
+      <th>Peak per-host mem. consumption</th>
+      <th>Scan bytes read</th>
+      <th>User CPU(s)</th>
+      <th>Kernel CPU(s)</th>
       <th>Time since last report (ms)</th>
     </tr>
   </thead>
@@ -64,7 +67,10 @@ $(document).ready(function() {
                      {data: 'num_instances'},
                      {data: 'num_remaining_instances'},
                      {data: 'done'},
-                     {data: 'peak_mem_consumption', render: renderSize},
+                     {data: 'peak_per_host_mem_consumption', render: renderSize},
+                     {data: 'bytes_read', render: renderSize},
+                     {data: 'cpu_user_s'},
+                     {data: 'cpu_sys_s'},
                      {data: 'time_since_last_heard_from'}],
         "order": [[ 0, "desc" ]],
         "pageLength": 100