You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/02/06 05:18:20 UTC

[impala] 01/08: IMPALA-7694: Add host resource usage metrics to profile

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5714097e096c6e4b0573a7b326789807a1e4e5f
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Tue Nov 13 20:50:41 2018 -0800

    IMPALA-7694: Add host resource usage metrics to profile
    
    This change adds a mechanism to collect host resource usage metrics to
    profiles. Metric collection can be controlled through a new query option
    'RESOURCE_TRACE_RATIO'. It specifies the probability with which metrics
    collection will be enabled. Collection always happens per query for all
    executors that run one or more fragment instances of the query.
    
    This mechanism adds a new time series counter class that collects all
    measured values and does not re-sample them. It will re-sample values
    when printing them into a string profile, preserving up to 64 values,
    but Thrift profiles will contain the full list of values.
    
    We add a new section "Per Node Profiles" to the profile to store and
    show these values:
    
    Per Node Profiles:
      lv-desktop:22000:
        CpuIoWaitPercentage (500.000ms): 0, 0
        CpuSysPercentage (500.000ms): 1, 1
        CpuUserPercentage (500.000ms): 4, 0
          - ScratchBytesRead: 0
          - ScratchBytesWritten: 0
          - ScratchFileUsedBytes: 0
          - ScratchReads: 0 (0)
          - ScratchWrites: 0 (0)
          - TotalEncryptionTime: 0.000ns
          - TotalReadBlockTime: 0.000ns
    
    This change also uses the aforementioned mechanism to collect CPU usage
    metrics (user, system, and IO wait time).
    
    A future change can then add a tool to decode a Thrift profile and plot
    the contained usage metrics, e.g. using matplotlib (IMPALA-8123). Such a
    tool is not included in this change because it will require some
    reworking of the python dependencies.
    
    This change also includes a few minor improvements to make the resulting
    code more readable:
    - Extend the PeriodicCounterUpdater to call functions to update global
      metrics before updating the counters.
    - Expose the scratch profile within the per node resource usage section.
    - Improve documentation of the profile counter classes.
    - Remove synchronization from StreamingSampler.
    - Remove a few pieces of dead code that otherwise would have required
      updates.
    - Factor some code for profile decoding into the Impala python library
    
    Testing: This change contains a unit test for the system level metrics
    collection and e2e tests for the profile changes.
    
    Change-Id: I3aedc20c553ab8d7ed50f72a1a936eba151487d9
    Reviewed-on: http://gerrit.cloudera.org:8080/12069
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/util/logging.h                         |   2 +-
 be/src/runtime/coordinator-backend-state.cc        |  12 +-
 be/src/runtime/coordinator-backend-state.h         |  14 +-
 be/src/runtime/coordinator.cc                      |  10 +-
 be/src/runtime/coordinator.h                       |   5 +
 be/src/runtime/exec-env.cc                         |  11 ++
 be/src/runtime/exec-env.h                          |   8 +
 be/src/runtime/fragment-instance-state.cc          |   4 +-
 be/src/runtime/krpc-data-stream-recvr.cc           |   2 +-
 be/src/runtime/query-state.cc                      |  38 +++-
 be/src/runtime/query-state.h                       |   3 +
 be/src/runtime/runtime-state.cc                    |   7 +-
 be/src/service/impala-server.cc                    |   9 +
 be/src/service/impala-server.h                     |   5 +
 be/src/service/query-options.cc                    |  18 +-
 be/src/service/query-options.h                     |   3 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/periodic-counter-updater.cc            |  16 +-
 be/src/util/periodic-counter-updater.h             |  26 ++-
 be/src/util/pretty-printer.h                       |  10 +-
 be/src/util/runtime-profile-counters.h             | 133 +++++++++++---
 be/src/util/runtime-profile-test.cc                | 175 ++++++++++++++++++
 be/src/util/runtime-profile.cc                     | 199 +++++++++++++++++----
 be/src/util/runtime-profile.h                      |  80 +++++++--
 be/src/util/streaming-sampler.h                    |  60 +------
 be/src/util/system-state-info-test.cc              |  91 ++++++++++
 be/src/util/system-state-info.cc                   | 110 ++++++++++++
 be/src/util/system-state-info.h                    |  94 ++++++++++
 bin/parse-thrift-profile.py                        |  28 +--
 common/thrift/ImpalaInternalService.thrift         |   7 +
 common/thrift/ImpalaService.thrift                 |   7 +-
 common/thrift/Metrics.thrift                       |   3 +
 common/thrift/RuntimeProfile.thrift                |   7 +
 .../python/impala_py_lib/profiles.py               |  45 +----
 tests/beeswax/impala_beeswax.py                    |   9 +-
 tests/query_test/test_observability.py             | 130 +++++++++-----
 36 files changed, 1109 insertions(+), 274 deletions(-)

diff --git a/be/src/kudu/util/logging.h b/be/src/kudu/util/logging.h
index 428dadc..5aebaec 100644
--- a/be/src/kudu/util/logging.h
+++ b/be/src/kudu/util/logging.h
@@ -161,7 +161,7 @@ class ScopedDisableRedaction {
       &google::LogMessage::SendToLog).stream()
 
 #define KLOG_EVERY_N_SECS(severity, n_secs) \
-  static logging::LogThrottler LOG_THROTTLER;  \
+  static ::kudu::logging::LogThrottler LOG_THROTTLER;  \
   KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
 
 
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5d79450..ba4f355 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -58,13 +58,16 @@ Coordinator::BackendState::BackendState(
 }
 
 void Coordinator::BackendState::Init(
-    const BackendExecParams& exec_params,
-    const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
+    const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
+    RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
   host_ = backend_exec_params_->instance_params[0]->host;
   krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
+  host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
+  host_profile_parent->AddChild(host_profile_);
+
   // populate instance_stats_map_ and install instance
   // profiles as child profiles in fragment_stats' profile
   int prev_fragment_idx = -1;
@@ -384,6 +387,11 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   return IsDoneLocked(lock);
 }
 
+void Coordinator::BackendState::UpdateHostProfile(
+    const TRuntimeProfileTree& thrift_profile) {
+  host_profile_->Update(thrift_profile);
+}
+
 void Coordinator::BackendState::UpdateExecStats(
     const vector<FragmentStats*>& fragment_stats) {
   lock_guard<mutex> l(lock_);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 15790f9..122da42 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -61,10 +61,12 @@ class Coordinator::BackendState {
 
   /// Creates InstanceStats for all instance in backend_exec_params in obj_pool
   /// and installs the instance profiles as children of the corresponding FragmentStats'
-  /// root profile.
+  /// root profile. Also creates a child profile below 'host_profile_parent' that contains
+  /// counters for the backend.
   /// Separated from c'tor to simplify future handling of out-of-mem errors.
   void Init(const BackendExecParams& backend_exec_params,
-      const std::vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool);
+      const std::vector<FragmentStats*>& fragment_stats,
+      RuntimeProfile* host_profile_parent, ObjectPool* obj_pool);
 
   /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
@@ -87,6 +89,9 @@ class Coordinator::BackendState {
       const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
       ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state);
 
+  /// Merges the incoming 'thrift_profile' into this backend state's host profile.
+  void UpdateHostProfile(const TRuntimeProfileTree& thrift_profile);
+
   /// Update completion_times, rates, and avg_profile for all fragment_stats.
   void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
 
@@ -244,6 +249,11 @@ class Coordinator::BackendState {
   /// indices of fragments executing on this backend, populated in Init()
   std::unordered_set<int> fragments_;
 
+  /// Contains counters for the backend host that are not specific to a particular
+  /// fragment instance, e.g. global CPU utilization and scratch space usage.
+  /// Owned by coordinator object pool provided in the c'tor, created in Update().
+  RuntimeProfile* host_profile_ = nullptr;
+
   /// Thrift address of execution backend.
   TNetworkAddress host_;
   /// Krpc address of execution backend.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 21da855..1579ada 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -99,6 +99,9 @@ Status Coordinator::Exec() {
   finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
   filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
 
+  host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
+  query_profile_->AddChild(host_profiles_);
+
   SCOPED_TIMER(query_profile_->total_time_counter());
 
   // initialize progress updater
@@ -207,7 +210,7 @@ void Coordinator::InitBackendStates() {
   for (const auto& entry: schedule_.per_backend_exec_params()) {
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(*this, backend_idx, filter_mode_));
-    backend_state->Init(entry.second, fragment_stats_, obj_pool());
+    backend_state->Init(entry.second, fragment_stats_, host_profiles_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
 }
@@ -699,6 +702,10 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
   }
   BackendState* backend_state = backend_states_[coord_state_idx];
 
+  if (thrift_profiles.__isset.host_profile) {
+    backend_state->UpdateHostProfile(thrift_profiles.host_profile);
+  }
+
   if (backend_state->ApplyExecStatusReport(request, thrift_profiles, &exec_summary_,
           &progress_, &dml_exec_state_)) {
     // This backend execution has completed.
@@ -816,6 +823,7 @@ void Coordinator::ComputeQuerySummary() {
   COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
       total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
 
+  // TODO(IMPALA-8126): Move to host profiles
   query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
   query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
   query_profile_->AddInfoString("Per Node User Time", cpu_user_info.str());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index a559c14..b040120 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -263,6 +263,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
   RuntimeProfile* query_profile_ = nullptr;
 
+  /// Aggregate counters for backend host resource usage and other per-host information.
+  /// Will contain a child profile for each backend host that participates in the query
+  /// execution. Lives in 'obj_pool_'. Set in Exec().
+  RuntimeProfile* host_profiles_ = nullptr;
+
   /// Total time spent in finalization (typically 0 except for INSERT into hdfs
   /// tables). Set in Exec().
   RuntimeProfile::Counter* finalization_timer_ = nullptr;
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0b0a889..db214f3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -62,8 +62,10 @@
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/parse-util.h"
+#include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/test-info.h"
+#include "util/system-state-info.h"
 #include "util/thread-pool.h"
 #include "util/webserver.h"
 
@@ -269,6 +271,8 @@ Status ExecEnv::Init() {
   }
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
+  InitSystemStateInfo();
+
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
@@ -500,6 +504,13 @@ void ExecEnv::InitMemTracker(int64_t bytes_limit) {
   }
 }
 
+void ExecEnv::InitSystemStateInfo() {
+  system_state_info_.reset(new SystemStateInfo());
+  PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
+    s->CaptureSystemStateSnapshot();
+  });
+}
+
 TNetworkAddress ExecEnv::GetThriftBackendAddress() const {
   DCHECK(impala_server_ != nullptr);
   return impala_server_->GetThriftBackendAddress();
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 8a5a1f4..fc6f41d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -61,6 +61,7 @@ class ReservationTracker;
 class RpcMgr;
 class Scheduler;
 class StatestoreSubscriber;
+class SystemStateInfo;
 class ThreadResourceMgr;
 class TmpFileMgr;
 class Webserver;
@@ -138,6 +139,7 @@ class ExecEnv {
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
+  SystemStateInfo* system_state_info() { return system_state_info_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -207,6 +209,9 @@ class ExecEnv {
   boost::scoped_ptr<ReservationTracker> buffer_reservation_;
   boost::scoped_ptr<BufferPool> buffer_pool_;
 
+  /// Tracks system resource usage which we then include in profiles.
+  boost::scoped_ptr<SystemStateInfo> system_state_info_;
+
   /// Not owned by this class
   ImpalaServer* impala_server_ = nullptr;
   MetricGroup* rpc_metrics_ = nullptr;
@@ -260,6 +265,9 @@ class ExecEnv {
   /// Initialise 'mem_tracker_' with a limit of 'bytes_limit'. Must be called after
   /// InitBufferPool() and RegisterMemoryMetrics().
   void InitMemTracker(int64_t bytes_limit);
+
+  /// Initialize 'system_state_info_' to track system resource usage.
+  void InitSystemStateInfo();
 };
 
 } // namespace impala
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index c8ef699..c5799ee 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -162,11 +162,11 @@ Status FragmentInstanceState::Prepare() {
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
-  mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
+  mem_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("MemoryUsage",
       TUnit::BYTES,
       bind<int64_t>(mem_fn(&MemTracker::consumption),
           runtime_state_->instance_mem_tracker()));
-  thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
+  thread_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("ThreadUsage",
       TUnit::UNIT,
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 3bad645..04babfb 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -672,7 +672,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   total_deferred_rpcs_counter_ =
       ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
   deferred_rpcs_time_series_counter_ =
-      enqueue_profile_->AddTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
+      enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
       bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
   total_has_deferred_rpcs_timer_ =
       ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5779a0c..c0b5cc6 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -42,6 +42,7 @@
 #include "service/control-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/system-state-info.h"
 #include "util/thread.h"
 
 #include "gen-cpp/control_service.pb.h"
@@ -82,7 +83,8 @@ QueryState::QueryState(
     backend_resource_refcnt_(0),
     refcnt_(0),
     is_cancelled_(0),
-    query_spilled_(0) {
+    query_spilled_(0),
+    host_profile_(RuntimeProfile::Create(obj_pool(), "<track resource usage>")) {
   if (query_ctx_.request_pool.empty()) {
     // fix up pool name for tests
     DCHECK(!request_pool.empty());
@@ -130,6 +132,8 @@ QueryState::~QueryState() {
     // therefore be safely destroyed.
     query_mem_tracker_->CloseAndUnregisterFromParent();
   }
+  /// We started periodic counters that track the system resource usage in Init().
+  host_profile_->StopPeriodicCounters();
 }
 
 Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
@@ -138,10 +142,28 @@ Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
   // returns a resource refcount to its caller.
   AcquireBackendResourceRefcount();
 
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+
+  // Initialize resource tracking counters.
+  if (query_ctx().trace_resource_usage) {
+    SystemStateInfo* system_state_info = exec_env->system_state_info();
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().user;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().system;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().iowait;
+        });
+  }
+
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
-  ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
   if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) {
     string msg = Substitute(
@@ -202,14 +224,10 @@ Status QueryState::InitBufferPoolState() {
   buffer_reservation_->InitChildTracker(
       NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
 
-  // TODO: once there's a mechanism for reporting non-fragment-local profiles,
-  // should make sure to report this profile so it's not going into a black hole.
-  RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy");
-  // Only create file group if spilling is enabled.
   if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
     file_group_ = obj_pool_.Add(
         new TmpFileMgr::FileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
-            dummy_profile, query_id(), query_options().scratch_limit));
+            host_profile_, query_id(), query_options().scratch_limit));
   }
   return Status::OK();
 }
@@ -275,6 +293,12 @@ void QueryState::ConstructReport(bool instances_started,
     }
   }
 
+  // Add profile to report
+  host_profile_->ToThrift(&profiles_forest->host_profile);
+  profiles_forest->__isset.host_profile = true;
+  // Free resources in chunked counters in the profile
+  host_profile_->ClearChunkedTimeSeriesCounters();
+
   if (instances_started) {
     for (const auto& entry : fis_map_) {
       FragmentInstanceState* fis = entry.second;
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 6453d3e..74afb8a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -384,6 +384,9 @@ class QueryState {
   /// StartFInstances().
   int64_t fragment_events_start_time_ = 0;
 
+  /// Tracks host resource usage of this backend. Owned by 'obj_pool_', created in c'tor.
+  RuntimeProfile* const host_profile_;
+
   /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
   /// to the query mem tracker. The query is associated with the resource pool set in
   /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index bcaa028..9ecaafb 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -70,11 +70,10 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     fragment_ctx_(&fragment_ctx),
     instance_ctx_(&instance_ctx),
     now_(new TimestampValue(TimestampValue::Parse(query_state->query_ctx().now_string))),
-    utc_timestamp_(new TimestampValue(TimestampValue::Parse(
-        query_state->query_ctx().utc_timestamp_string))),
+    utc_timestamp_(new TimestampValue(
+        TimestampValue::Parse(query_state->query_ctx().utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    profile_(RuntimeProfile::Create(
-          obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
+    profile_(RuntimeProfile::Create(obj_pool(), "<fragment instance>")),
     instance_buffer_reservation_(new ReservationTracker) {
   Init();
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index fa0c002..5b18cdd 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -48,6 +48,7 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
+#include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-thread.h"
@@ -107,6 +108,7 @@ using boost::get_system_time;
 using boost::system_time;
 using boost::uuids::random_generator;
 using boost::uuids::uuid;
+using kudu::GetRandomSeed32;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -262,6 +264,8 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
 // Interval between checks for query expiration.
 const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
 
+ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
+
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
       thrift_serializer_(false),
@@ -1018,6 +1022,11 @@ void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
   // thread-safe).
   query_ctx->query_id = UuidToQueryId(random_generator()());
   GetThreadDebugInfo()->SetQueryId(query_ctx->query_id);
+
+  const double trace_ratio = query_ctx->client_request.query_options.resource_trace_ratio;
+  if (trace_ratio > 0 && rng_.NextDoubleFraction() < trace_ratio) {
+    query_ctx->__set_trace_resource_usage(true);
+  }
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 981df74..dd7d221 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -32,6 +32,7 @@
 #include "gen-cpp/ImpalaHiveServer2Service.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
+#include "kudu/util/random.h"
 #include "rpc/thrift-server.h"
 #include "common/status.h"
 #include "service/query-options.h"
@@ -47,6 +48,7 @@
 #include "statestore/statestore-subscriber.h"
 
 namespace impala {
+using kudu::ThreadSafeRandom;
 
 class ExecEnv;
 class DataSink;
@@ -914,6 +916,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Background thread that does the shutdown.
   [[noreturn]] void ShutdownThread();
 
+  /// Random number generator for use in this class, thread safe.
+  static ThreadSafeRandom rng_;
+
   /// Guards query_log_ and query_log_index_
   boost::mutex query_log_lock_;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c3cf1e3..6f5bfce 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -75,13 +75,13 @@ void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMas
 // Choose different print function based on the type.
 // TODO: In thrift 0.11.0 operator << is implemented for enums and this indirection can be
 // removed.
-template<typename T, typename std::enable_if_t<std::is_enum<T>::value>* = nullptr>
+template <typename T, typename std::enable_if_t<std::is_enum<T>::value>* = nullptr>
 string PrintQueryOptionValue(const T& option) {
   return PrintThriftEnum(option);
 }
 
-template<typename T, typename std::enable_if_t<std::is_integral<T>::value>* = nullptr>
-string PrintQueryOptionValue(const T& option)  {
+template <typename T, typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr>
+string PrintQueryOptionValue(const T& option) {
   return std::to_string(option);
 }
 
@@ -724,6 +724,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_client_identifier(value);
         break;
       }
+      case TImpalaQueryOptions::RESOURCE_TRACE_RATIO: {
+        StringParser::ParseResult result;
+        const double val =
+            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1) {
+          return Status(Substitute("Invalid resource trace ratio: '$0'. "
+                                   "Only values from 0 to 1 are allowed.",
+              value));
+        }
+        query_options->__set_resource_trace_ratio(val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index f001ba4..b3276ff 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::CLIENT_IDENTIFIER + 1);\
+      TImpalaQueryOptions::RESOURCE_TRACE_RATIO + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -145,6 +145,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index e80560e..63bb0ba 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -79,6 +79,7 @@ add_library(Util
   string-parser.cc
   string-util.cc
   symbols-util.cc
+  system-state-info.cc
   static-asserts.cc
   summary-util.cc
   table-printer.cc
@@ -144,6 +145,7 @@ ADD_BE_LSAN_TEST(runtime-profile-test)
 ADD_BE_LSAN_TEST(string-parser-test)
 ADD_BE_LSAN_TEST(string-util-test)
 ADD_BE_LSAN_TEST(symbols-util-test)
+ADD_BE_LSAN_TEST(system-state-info-test)
 ADD_BE_LSAN_TEST(sys-info-test)
 ADD_BE_LSAN_TEST(thread-pool-test)
 ADD_BE_LSAN_TEST(time-test)
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 098e683..8bef6aa 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -26,12 +26,12 @@ namespace posix_time = boost::posix_time;
 using boost::get_system_time;
 using boost::system_time;
 
-namespace impala {
-
 // Period to update rate counters and sampling counters in ms.
 DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
     " sampling counters in ms");
 
+namespace impala {
+
 PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
 
 void PeriodicCounterUpdater::Init() {
@@ -42,9 +42,14 @@ void PeriodicCounterUpdater::Init() {
       new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
 }
 
+void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) {
+  lock_guard<SpinLock> l(instance_->update_fns_lock_);
+  instance_->update_fns_.push_back(update_fn);
+}
+
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
     RuntimeProfile::Counter* src_counter,
-    RuntimeProfile::DerivedCounterFunction sample_fn,
+    RuntimeProfile::SampleFunction sample_fn,
     RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
   DCHECK(src_counter == NULL || sample_fn == NULL);
 
@@ -133,6 +138,11 @@ void PeriodicCounterUpdater::UpdateLoop() {
     int elapsed_ms = elapsed.total_milliseconds();
 
     {
+      lock_guard<SpinLock> l(update_fns_lock_);
+      for (UpdateFn& f : update_fns_) f();
+    }
+
+    {
       lock_guard<SpinLock> ratelock(instance_->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 762f372..3c63d6d 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -34,7 +34,9 @@ namespace impala {
 /// metric (e.g. memory used) at regular intervals. The samples can be summarized in
 /// a few ways (e.g. averaged, stored as histogram, kept as a time series data, etc).
 /// This class has one thread that will wake up at a regular period and update all
-/// the registered counters.
+/// the registered counters. Optionally, users can register functions to be called before
+/// counters get updated, for example to update global metrics that the counters then
+/// pull from.
 /// Typically, the counter updates should be stopped as early as possible to prevent
 /// future stale samples from polluting the useful values.
 class PeriodicCounterUpdater {
@@ -44,17 +46,23 @@ class PeriodicCounterUpdater {
     SAMPLING_COUNTER,
   };
 
-  // Sets up data structures and starts the counter update thread. Should only be called
-  // once during process startup and must be called before other methods.
+  /// Sets up data structures and starts the counter update thread. Should only be called
+  /// once during process startup and must be called before other methods.
   static void Init();
 
+  typedef std::function<void()> UpdateFn;
+  /// Registers an update function that will be called before individual counters will be
+  /// updated. This can be used to update some global metric once before reading it
+  /// through individual counters.
+  static void RegisterUpdateFunction(UpdateFn update_fn);
+
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
   /// is updated, it either gets the value from the dst_counter or calls the sample
   /// function to get the value.
   /// dst_counter/sample fn is assumed to be compatible types with src_counter.
   static void RegisterPeriodicCounter(RuntimeProfile::Counter* src_counter,
-      RuntimeProfile::DerivedCounterFunction sample_fn,
+      RuntimeProfile::SampleFunction sample_fn,
       RuntimeProfile::Counter* dst_counter, PeriodicCounterType type);
 
   /// Adds a bucketing counter to be updated at regular intervals.
@@ -82,13 +90,13 @@ class PeriodicCounterUpdater {
  private:
   struct RateCounterInfo {
     RuntimeProfile::Counter* src_counter;
-    RuntimeProfile::DerivedCounterFunction sample_fn;
+    RuntimeProfile::SampleFunction sample_fn;
     int64_t elapsed_ms;
   };
 
   struct SamplingCounterInfo {
     RuntimeProfile::Counter* src_counter; // the counter to be sampled
-    RuntimeProfile::DerivedCounterFunction sample_fn;
+    RuntimeProfile::SampleFunction sample_fn;
     int64_t total_sampled_value; // sum of all sampled values;
     int64_t num_sampled; // number of samples taken
   };
@@ -106,6 +114,12 @@ class PeriodicCounterUpdater {
   /// Thread performing asynchronous updates.
   boost::scoped_ptr<boost::thread> update_thread_;
 
+  /// List of functions that will be called before individual counters will be sampled.
+  std::vector<UpdateFn> update_fns_;
+
+  /// Spinlock that protects the list of update functions (and their execution).
+  SpinLock update_fns_lock_;
+
   /// Spinlock that protects the map of rate counters
   SpinLock rate_lock_;
 
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index 6226eba..c44bc26 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -20,8 +20,9 @@
 
 #include <boost/algorithm/string.hpp>
 #include <cmath>
-#include <sstream>
 #include <iomanip>
+#include <limits>
+#include <sstream>
 
 #include "gen-cpp/RuntimeProfile_types.h"
 #include "util/cpu-info.h"
@@ -135,6 +136,13 @@ class PrettyPrinter {
         break;
       }
 
+      // Printed as integer values
+      case TUnit::BASIS_POINTS: {
+        DCHECK_LE(value, 10000);
+        ss << (value / 100);
+        break;
+      }
+
       default:
         DCHECK(false) << "Unsupported TUnit: " << value;
         break;
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 1d3d748..a9b3875 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 #define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 
@@ -33,6 +32,10 @@
 
 namespace impala {
 
+/// This file contains the declarations of various counters that can be used in runtime
+/// profiles. See the class-level comment for RuntimeProfile (runtime-profile.h) for an
+/// overview of what there is. When making changes, please also update that comment.
+
 /// Define macros for updating counters.  The macros make it very easy to disable
 /// all counters at compile time.  Set this to 0 to remove counters.  This is useful
 /// to do to make sure the counters aren't affecting the system.
@@ -45,7 +48,7 @@ namespace impala {
 #if ENABLE_COUNTERS
   #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
-      (profile)->AddTimeSeriesCounter(name, src_counter)
+      (profile)->AddSamplingTimeSeriesCounter(name, src_counter)
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
@@ -142,7 +145,7 @@ class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
 /// Do not call Set() and Add().
 class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
  public:
-  DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
+  DerivedCounter(TUnit::type unit, const SampleFunction& counter_fn)
     : Counter(unit),
       counter_fn_(counter_fn) {}
 
@@ -151,7 +154,7 @@ class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
   }
 
  private:
-  DerivedCounterFunction counter_fn_;
+  SampleFunction counter_fn_;
 };
 
 /// An AveragedCounter maintains a set of counters and its value is the
@@ -384,41 +387,125 @@ class RuntimeProfile::EventSequence {
   int64_t offset_ = 0;
 };
 
-typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+/// Abstract base for counters to capture a time series of values. Users can add samples
+/// to counters in periodic intervals, and the RuntimeProfile class will retrieve them by
+/// accessing the private interface. Methods are thread-safe where explicitly stated.
 class RuntimeProfile::TimeSeriesCounter {
  public:
-  std::string DebugString() const;
+  // Adds a sample. Thread-safe.
+  void AddSample(int ms_elapsed);
 
-  void AddSample(int ms_elapsed) {
-    int64_t sample = sample_fn_();
-    samples_.AddSample(sample, ms_elapsed);
+  // Returns a pointer do the sample data together with the number of samples and the
+  // sampling period. This method is not thread-safe and must only be used in tests.
+  const int64_t* GetSamplesTest(int* num_samples, int* period) {
+    return GetSamplesLockedForSend(num_samples, period);
   }
 
+  virtual ~TimeSeriesCounter() {}
+
  private:
   friend class RuntimeProfile;
 
-  TimeSeriesCounter(const std::string& name, TUnit::type unit,
-      DerivedCounterFunction fn)
-    : name_(name), unit_(unit), sample_fn_(fn) {
-  }
+  void ToThrift(TTimeSeriesCounter* counter);
 
-  /// Construct a time series object from existing sample data. This counter
-  /// is then read-only (i.e. there is no sample function).
-  TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
-      const std::vector<int64_t>& values)
-    : name_(name), unit_(unit), sample_fn_(), samples_(period, values) {
-  }
+  /// Adds a sample to the counter. Caller must hold lock_.
+  virtual void AddSampleLocked(int64_t value, int ms_elapsed) = 0;
 
-  void ToThrift(TTimeSeriesCounter* counter);
+  /// Returns a pointer to memory containing all samples of the counter. The caller must
+  /// hold lock_. The returned pointer is only valid while the caller holds lock_.
+  virtual const int64_t* GetSamplesLocked(int* num_samples, int* period) const = 0;
+
+  /// Returns a pointer to memory containing all samples of the counter and marks the
+  /// samples as retrieved, so that a subsequent call to Clear() can remove them. The
+  /// caller must hold lock_. The returned pointer is only valid while the caller holds
+  /// lock_.
+  virtual const int64_t* GetSamplesLockedForSend(int* num_samples, int* period);
+
+  /// Sets all internal samples. Thread-safe. Not implemented by all child classes. The
+  /// caller must make sure that this is only called on supported classes.
+  virtual void SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx);
+
+  /// Implemented by some child classes to clear internal sample buffers. No-op on other
+  /// child classes.
+  virtual void Clear() {}
+
+ protected:
+  TimeSeriesCounter(const std::string& name, TUnit::type unit,
+      SampleFunction fn = SampleFunction())
+    : name_(name), unit_(unit), sample_fn_(fn) {}
+
+  TUnit::type unit() const { return unit_; }
 
   std::string name_;
   TUnit::type unit_;
-  DerivedCounterFunction sample_fn_;
+  SampleFunction sample_fn_;
+  /// The number of samples that have been retrieved and cleared from this counter.
+  int64_t previous_sample_count_ = 0;
+  mutable SpinLock lock_;
+};
+
+typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+class RuntimeProfile::SamplingTimeSeriesCounter
+    : public RuntimeProfile::TimeSeriesCounter {
+ private:
+  friend class RuntimeProfile;
+
+  SamplingTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction fn)
+    : TimeSeriesCounter(name, unit, fn) {}
+
+  virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
+  virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override;
+
   StreamingCounterSampler samples_;
 };
 
-/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
-/// concurrent running time.
+/// Time series counter that supports piece-wise transmission of its samples.
+///
+/// This time series counter will capture samples into an internal unbounded buffer.
+/// The buffer can be reset to clear out values that have already been transmitted
+/// elsewhere.
+class RuntimeProfile::ChunkedTimeSeriesCounter
+    : public RuntimeProfile::TimeSeriesCounter {
+ public:
+  /// Clears the internal sample buffer and updates the number of samples that the counter
+  /// has seen in total so far.
+  virtual void Clear() override;
+
+ private:
+  friend class RuntimeProfile;
+
+  /// Constructs a time series counter that uses 'fn' to generate new samples. It's size
+  /// is bounded by the expected number of samples per status update times a constant
+  /// factor.
+  ChunkedTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction fn);
+
+  /// Constructs a time series object from existing sample data. This counter is then
+  /// read-only (i.e. there is no sample function). This counter has no maximum size.
+  ChunkedTimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
+      const std::vector<int64_t>& values)
+    : TimeSeriesCounter(name, unit), period_(period), values_(values), max_size_(0) {}
+
+  virtual void AddSampleLocked(int64_t value, int ms_elapsed) override;
+  virtual const int64_t* GetSamplesLocked(int* num_samples, int* period) const override;
+  virtual const int64_t* GetSamplesLockedForSend(int* num_samples, int* period) override;
+
+  virtual void SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx) override;
+
+  int period_ = 0;
+  std::vector<int64_t> values_;
+  // The number of values returned through the last call to GetSamplesLockedForSend().
+  int64_t last_get_count_ = 0;
+  // The maximum number of samples that can be stored in this counter. We drop samples at
+  // the front before appending new ones if we would exceed this count.
+  int64_t max_size_;
+};
+
+/// Counter whose value comes from an internal ConcurrentStopWatch to track concurrent
+/// running time for multiple threads.
 class RuntimeProfile::ConcurrentTimerCounter : public Counter {
  public:
   ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 73553d2..87acb69 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -29,6 +29,9 @@
 
 #include "common/names.h"
 
+DECLARE_int32(status_report_interval_ms);
+DECLARE_int32(periodic_counter_update_period_ms);
+
 namespace impala {
 
 TEST(CountersTest, Basic) {
@@ -966,6 +969,178 @@ TEST(TimerCounterTest, CountersTestRandom) {
   ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start);
 }
 
+
+TEST(TimeSeriesCounterTest, TestAddClearRace) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  int i = 0;
+  // Return and increment i
+  auto f = [&i]() { return i++; };
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("Counter", TUnit::UNIT, f);
+  // Sleep 1 second for some values to accumulate.
+  sleep(1);
+  int num_samples, period;
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_GT(num_samples, 0);
+
+  // Wait for more values to show up
+  sleep(1);
+
+  // Stop the counters. The rest of the test assumes that no new values will be added.
+  profile->StopPeriodicCounters();
+
+  // Clear the counter
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Check that clearing multiple times doesn't affect valued that have not been
+  // retrieved.
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Make sure that it still has values in it.
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_GT(num_samples, 0);
+
+  // Clear it again
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Make sure the values are gone.
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_EQ(num_samples, 0);
+}
+
+/// Stops the periodic counter updater in 'profile' and then clears the samples in
+/// 'counter'.
+void StopAndClearCounter(RuntimeProfile* profile,
+    RuntimeProfile::TimeSeriesCounter* counter) {
+  // There's a race between adding the counter and calling StopPeriodicCounters so we
+  // sleep here to make sure we exercise the code that handles the race.
+  sleep(1);
+  profile->StopPeriodicCounters();
+
+  // Reset the counter state by reading and clearing its samples.
+  int num_samples = 0;
+  int result_period_unused = 0;
+  counter->GetSamplesTest(&num_samples, &result_period_unused);
+  ASSERT_GT(num_samples, 0);
+  profile->ClearChunkedTimeSeriesCounters();
+  // Ensure clean state.
+  counter->GetSamplesTest(&num_samples, &result_period_unused);
+  ASSERT_EQ(num_samples, 0);
+}
+
+/// Tests that ChunkedTimeSeriesCounters are bounded by a maximum size.
+TEST(TimeSeriesCounterTest, TestMaximumSize) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+
+  const int test_period = FLAGS_periodic_counter_update_period_ms;
+
+  // Add a counter with a sample function that counts up, starting from 0.
+  int value = 0;
+  auto sample_fn = [&value]() { return value++; };
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn);
+
+  // Stop counter updates from interfering with the rest of the test.
+  StopAndClearCounter(profile, counter);
+
+  // Reset value after previous values have been retrieved.
+  value = 0;
+
+  int64_t max_size = 10 * FLAGS_status_report_interval_ms / test_period;
+  for (int i = 0; i < 10 + max_size; ++i) counter->AddSample(test_period);
+
+  int num_samples = 0;
+  int result_period = 0;
+  // Retrieve and validate samples.
+  const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period);
+  ASSERT_EQ(num_samples, max_size);
+  // No resampling happens with ChunkedTimeSeriesCounter.
+  ASSERT_EQ(result_period, test_period);
+
+  // First 10 samples have been truncated
+  ASSERT_EQ(samples[0], 10);
+}
+
+/// Test parameter class that helps to test time series resampling during profile pretty
+/// printing with a varying number of test samples.
+struct TimeSeriesTestParam {
+  TimeSeriesTestParam(int num_samples, vector<const char*> expected)
+    : num_samples(num_samples), expected(expected) {}
+  int num_samples;
+  vector<const char*> expected;
+
+  // Used by gtest to print values of this struct
+  friend std::ostream& operator<<(std::ostream& os, const TimeSeriesTestParam& p) {
+    return os << "num_samples: " << p.num_samples << endl;
+  }
+};
+
+class TimeSeriesCounterResampleTest : public testing::TestWithParam<TimeSeriesTestParam> {
+};
+
+/// Tests that pretty-printing a ChunkedTimeSeriesCounter limits the number or printed
+/// samples to 64 or lower.
+TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+
+  const TimeSeriesTestParam& param = GetParam();
+  const int test_period = FLAGS_periodic_counter_update_period_ms;
+
+  // Add a counter with a sample function that counts up, starting from 0.
+  int value = 0;
+  auto sample_fn = [&value]() { return value++; };
+  // We increase the value of this flag to allow the counter to store enough samples.
+  FLAGS_status_report_interval_ms = 50000;
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn);
+
+  // Stop counter updates from interfering with the rest of the test.
+  StopAndClearCounter(profile, counter);
+
+  // Reset value after previous values have been retrieved.
+  value = 0;
+  for (int i = 0; i < param.num_samples; ++i) counter->AddSample(test_period);
+
+  int num_samples = 0;
+  int result_period = 0;
+  // Retrieve and validate samples.
+  const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period);
+  ASSERT_EQ(num_samples, param.num_samples);
+  // No resampling happens with ChunkedTimeSeriesCounter.
+  ASSERT_EQ(result_period, test_period);
+
+  for (int i = 0; i < param.num_samples; ++i) ASSERT_EQ(samples[i], i);
+
+  stringstream pretty;
+  profile->PrettyPrint(&pretty);
+  const string pretty_str = pretty.str();
+
+  for (const char* e : param.expected) EXPECT_STR_CONTAINS(pretty_str, e);
 }
 
+INSTANTIATE_TEST_CASE_P(VariousNumbers, TimeSeriesCounterResampleTest,
+    ::testing::Values(
+    TimeSeriesTestParam(64, {"TestCounter (500.000ms): 0, 1, 2, 3", "61, 62, 63"}),
+
+    TimeSeriesTestParam(65, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "60, 62, 64 (Showing 33 of 65 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(80, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "74, 76, 78 (Showing 40 of 80 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(127, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "122, 124, 126 (Showing 64 of 127 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(128, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "122, 124, 126 (Showing 64 of 128 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(129, {"TestCounter (1s500ms): 0, 3, 6, 9,",
+    "120, 123, 126 (Showing 43 of 129 values from Thrift Profile)"})
+    ));
+
+} // namespace impala
+
 IMPALA_TEST_MAIN();
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 5696486..ec7417b 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -28,6 +28,7 @@
 
 #include "common/object-pool.h"
 #include "gutil/strings/strip.h"
+#include "kudu/util/logging.h"
 #include "rpc/thrift-util.h"
 #include "runtime/mem-tracker.h"
 #include "util/coding-util.h"
@@ -42,6 +43,9 @@
 
 #include "common/names.h"
 
+DECLARE_int32(status_report_interval_ms);
+DECLARE_int32(periodic_counter_update_period_ms);
+
 namespace impala {
 
 // Thread counters name
@@ -138,8 +142,10 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
 
   if (node.__isset.time_series_counters) {
     for (const TTimeSeriesCounter& val: node.time_series_counters) {
-      profile->time_series_counter_map_[val.name] =
-          pool->Add(new TimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
+      // Capture all incoming time series counters with the same type since re-sampling
+      // will have happened on the sender side.
+      profile->time_series_counter_map_[val.name] = pool->Add(
+          new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
     }
   }
 
@@ -310,10 +316,13 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       const TTimeSeriesCounter& c = node.time_series_counters[i];
       TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
       if (it == time_series_counter_map_.end()) {
-        time_series_counter_map_[c.name] =
-            pool_->Add(new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
+        // Capture all incoming time series counters with the same type since re-sampling
+        // will have happened on the sender side.
+        time_series_counter_map_[c.name] = pool_->Add(
+            new ChunkedTimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
       } else {
-        it->second->samples_.SetSamples(c.period_ms, c.values);
+        int64_t start_idx = c.__isset.start_index ? c.start_index : 0;
+        it->second->SetSamples(c.period_ms, c.values, start_idx);
       }
     }
   }
@@ -618,7 +627,7 @@ ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
 
 RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter(
     const string& name, TUnit::type unit,
-    const DerivedCounterFunction& counter_fn, const string& parent_counter_name) {
+    const SampleFunction& counter_fn, const string& parent_counter_name) {
   DCHECK_EQ(is_averaged_profile_, false);
   lock_guard<SpinLock> l(counter_map_lock_);
   if (counter_map_.find(name) != counter_map_.end()) return NULL;
@@ -645,7 +654,7 @@ RuntimeProfile::ThreadCounters* RuntimeProfile::AddThreadCounters(
   return counter;
 }
 
-void RuntimeProfile::AddLocalTimeCounter(const DerivedCounterFunction& counter_fn) {
+void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) {
   DerivedCounter* local_time_counter = pool_->Add(
       new DerivedCounter(TUnit::TIME_NS, counter_fn));
   lock_guard<SpinLock> l(counter_map_lock_);
@@ -768,22 +777,29 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
   {
     // Print all time series counters as following:
     //    - <Name> (<period>): <val1>, <val2>, <etc>
-    SpinLock* lock;
-    int num, period;
     lock_guard<SpinLock> l(counter_map_lock_);
     for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
-      const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
+      const TimeSeriesCounter* counter = v.second;
+      lock_guard<SpinLock> l(counter->lock_);
+      int num, period;
+      const int64_t* samples = counter->GetSamplesLocked(&num, &period);
       if (num > 0) {
-        stream << prefix << "   - " << v.first << "("
-               << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS)
-               << "): ";
-        for (int i = 0; i < num; ++i) {
-          stream << PrettyPrinter::Print(samples[i], v.second->unit_);
-          if (i != num - 1) stream << ", ";
+        // Clamp number of printed values at 64, the maximum number of values in the
+        // SamplingTimeSeriesCounter.
+        int step = 1 + (num - 1) / 64;
+        period *= step;
+        stream << prefix << "   - " << v.first << " ("
+               << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS) << "): ";
+        for (int i = 0; i < num; i += step) {
+          stream << PrettyPrinter::Print(samples[i], counter->unit());
+          if (i + step < num) stream << ", ";
+        }
+        if (step > 1) {
+          stream << " (Showing " << ((num + 1) / step) << " of " << num << " values from "
+            "Thrift Profile)";
         }
         stream << endl;
       }
-      lock->unlock();
     }
   }
 
@@ -1068,7 +1084,7 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
-    const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
+    const string& name, SampleFunction fn, TUnit::type dst_unit) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
   Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
@@ -1095,7 +1111,7 @@ RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
-    const string& name, DerivedCounterFunction sample_fn) {
+    const string& name, SampleFunction sample_fn) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
   Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
@@ -1173,44 +1189,153 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
   return counter;
 }
 
-RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
-    const string& name, TUnit::type unit, DerivedCounterFunction fn) {
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn) {
   DCHECK(fn != nullptr);
   lock_guard<SpinLock> l(counter_map_lock_);
   TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
   if (it != time_series_counter_map_.end()) return it->second;
-  TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
+  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
   time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
   has_active_periodic_counters_ = true;
   return counter;
 }
 
-RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
     const string& name, Counter* src_counter) {
   DCHECK(src_counter != NULL);
-  return AddTimeSeriesCounter(name, src_counter->unit(),
+  return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
       bind(&Counter::value, src_counter));
 }
 
-void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
-  counter->name = name_;
-  counter->unit = unit_;
+void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
+  lock_guard<SpinLock> l(lock_);
+  int64_t sample = sample_fn_();
+  AddSampleLocked(sample, ms_elapsed);
+}
+
+const int64_t* RuntimeProfile::TimeSeriesCounter::GetSamplesLockedForSend(
+    int* num_samples, int* period) {
+  return GetSamplesLocked(num_samples, period);
+}
+
+void RuntimeProfile::TimeSeriesCounter::SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx) {
+  DCHECK(false);
+}
+
+void RuntimeProfile::SamplingTimeSeriesCounter::AddSampleLocked(
+    int64_t sample, int ms_elapsed){
+  samples_.AddSample(sample, ms_elapsed);
+}
+
+const int64_t* RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked(
+    int* num_samples, int* period) const {
+  return samples_.GetSamples(num_samples, period);
+}
+
+RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn)
+  : TimeSeriesCounter(name, unit, fn)
+  , period_(FLAGS_periodic_counter_update_period_ms)
+  , max_size_(10 * FLAGS_status_report_interval_ms / period_) {}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::Clear() {
+  lock_guard<SpinLock> l(lock_);
+  previous_sample_count_ += last_get_count_;
+  values_.erase(values_.begin(), values_.begin() + last_get_count_);
+  last_get_count_ = 0;
+}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::AddSampleLocked(
+    int64_t sample, int ms_elapsed) {
+  // We chose inefficiently erasing elements from a vector over using a std::deque because
+  // this should only happen very infrequently and we rely on contiguous storage in
+  // GetSamplesLocked*().
+  if (max_size_ > 0 && values_.size() == max_size_) {
+    KLOG_EVERY_N_SECS(WARNING, 60) << "ChunkedTimeSeriesCounter reached maximum size";
+    values_.erase(values_.begin(), values_.begin() + 1);
+  }
+  DCHECK_LT(values_.size(), max_size_);
+  values_.push_back(sample);
+}
+
+const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLocked(
+    int* num_samples, int* period) const {
+  DCHECK(num_samples != nullptr);
+  DCHECK(period != nullptr);
+  *num_samples = values_.size();
+  *period = period_;
+  return values_.data();
+}
+
+const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLockedForSend(
+    int* num_samples, int* period) {
+  last_get_count_ = values_.size();
+  return GetSamplesLocked(num_samples, period);
+}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::SetSamples(
+    int period, const std::vector<int64_t>& samples, int64_t start_idx) {
+  lock_guard<SpinLock> l(lock_);
+  if (start_idx == 0) {
+    // This could be coming from a SamplingTimeSeriesCounter or another
+    // ChunkedTimeSeriesCounter.
+    period_ = period;
+    values_ = samples;
+    return;
+  }
+  // Only ChunkedTimeSeriesCounter will set start_idx > 0.
+  DCHECK_GT(start_idx, 0);
+  DCHECK_EQ(period_, period);
+  if (values_.size() < start_idx) {
+    // Fill up with 0.
+    values_.resize(start_idx);
+  }
+  DCHECK_GE(values_.size(), start_idx);
+  // Skip values we already received.
+  auto start_it = samples.begin() + values_.size() - start_idx;
+  values_.insert(values_.end(), start_it, samples.end());
+}
 
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddChunkedTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn) {
+  DCHECK(fn != nullptr);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
+  if (it != time_series_counter_map_.end()) return it->second;
+  TimeSeriesCounter* counter = pool_->Add(new ChunkedTimeSeriesCounter(name, unit, fn));
+  time_series_counter_map_[name] = counter;
+  PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
+  has_active_periodic_counters_ = true;
+  return counter;
+}
+
+void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
+  {
+    lock_guard<SpinLock> l(counter_map_lock_);
+    for (auto& it : time_series_counter_map_) it.second->Clear();
+  }
+  {
+    lock_guard<SpinLock> l(children_lock_);
+    for (int i = 0; i < children_.size(); ++i) {
+      children_[i].first->ClearChunkedTimeSeriesCounters();
+    }
+  }
+}
+
+void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
+  lock_guard<SpinLock> l(lock_);
   int num, period;
-  SpinLock* lock;
-  const int64_t* samples = samples_.GetSamples(&num, &period, &lock);
+  const int64_t* samples = GetSamplesLockedForSend(&num, &period);
   counter->values.resize(num);
   Ubsan::MemCpy(counter->values.data(), samples, num * sizeof(int64_t));
-  lock->unlock();
-  counter->period_ms = period;
-}
 
-string RuntimeProfile::TimeSeriesCounter::DebugString() const {
-  stringstream ss;
-  ss << "Counter=" << name_ << endl
-     << samples_.DebugString();
-  return ss.str();
+  counter->name = name_;
+  counter->unit = unit_;
+  counter->period_ms = period;
+  counter->__set_start_index(previous_sample_count_);
 }
 
 void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 8bba192..9203785 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -40,6 +40,43 @@ class ObjectPool;
 /// single thread per process that will convert an amount (i.e. bytes) counter to a
 /// corresponding rate based counter.  This thread wakes up at fixed intervals and updates
 /// all of the rate counters.
+///
+/// Runtime profile counters can be of several types. See their definition in
+/// runtime-profile-counters.h for more details.
+///
+/// - Counter: Tracks a single value or bitmap. Also serves as the base class for several
+///   |   other counters.
+///   |
+///   - AveragedCounter: Maintains a set of child counters. Its current value is the
+///   |     average of the current values of its children.
+///   |
+///   - ConcurrentTimerCounter: Wraps a ConcurrentStopWatch to track concurrent running
+///   |     time for multiple threads.
+///   |
+///   - DerivedCounter: Computes its current value by calling a function passed during
+///   |     construction.
+///   |
+///   - HighWaterMarkCounter: Keeps track of the highest value seen so far.
+///   |
+///   - SummaryStatsCounter: Keeps track of minimum, maximum, and average value of all
+///         values seen so far.
+///
+/// - EventSequence: Captures a sequence of events, each added by calling MarkEvent().
+///       Events have a text label and a time, relative to when the sequence was started.
+///
+/// - ThreadCounters: Tracks thread runtime information, such as total time, user time,
+///       sys time.
+///
+/// - TimeSeriesCounter (abstract): Keeps track of a value over time. Has two
+///   |   implementations.
+///   |
+///   - SamplingTimeSeriesCounter: Maintains a fixed array of 64 values and resamples if
+///   |     more value than that are added.
+///   |
+///   - ChunkedTimeSeriesCounter: Maintains an unbounded vector of values. Supports
+///         clearing its values after they have been retrieved, and will track the number
+///         of previously retrieved values.
+///
 /// All methods are thread-safe unless otherwise mentioned.
 class RuntimeProfile { // NOLINT: This struct is not packed, but there are not so many
                        // of them that it makes a performance difference
@@ -93,13 +130,15 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   class AveragedCounter;
   class ConcurrentTimerCounter;
   class DerivedCounter;
-  class EventSequence;
   class HighWaterMarkCounter;
   class SummaryStatsCounter;
+  class EventSequence;
   class ThreadCounters;
   class TimeSeriesCounter;
+  class SamplingTimeSeriesCounter;
+  class ChunkedTimeSeriesCounter;
 
-  typedef boost::function<int64_t ()> DerivedCounterFunction;
+  typedef boost::function<int64_t ()> SampleFunction;
 
   /// Create a runtime profile object with 'name'. The profile, counters and any other
   /// structures owned by the profile are allocated from 'pool'.
@@ -182,7 +221,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// parent_counter_name.
   /// Returns NULL if the counter already exists.
   DerivedCounter* AddDerivedCounter(const std::string& name, TUnit::type unit,
-      const DerivedCounterFunction& counter_fn,
+      const SampleFunction& counter_fn,
       const std::string& parent_counter_name = "");
 
   /// Add a set of thread counters prefixed with 'prefix'. Returns a ThreadCounters object
@@ -191,7 +230,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   // Add a derived counter to capture the local time. This function can be called at most
   // once.
-  void AddLocalTimeCounter(const DerivedCounterFunction& counter_fn);
+  void AddLocalTimeCounter(const SampleFunction& counter_fn);
 
   /// Gets the counter object with 'name'.  Returns NULL if there is no counter with
   /// that name.
@@ -332,7 +371,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   /// Same as 'AddRateCounter' above except values are taken by calling fn.
   /// The resulting counter will be of 'unit'.
-  Counter* AddRateCounter(const std::string& name, DerivedCounterFunction fn,
+  Counter* AddRateCounter(const std::string& name, SampleFunction fn,
       TUnit::type unit);
 
   /// Add a sampling counter to the current profile based on src_counter with name.
@@ -345,7 +384,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   Counter* AddSamplingCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddSamplingCounter' above except the samples are taken by calling fn.
-  Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn);
+  Counter* AddSamplingCounter(const std::string& name, SampleFunction fn);
 
   /// Create a set of counters, one per bucket, to store the sampled value of src_counter.
   /// The 'src_counter' is sampled periodically to obtain the index of the bucket to
@@ -363,17 +402,30 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing.
   std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets);
 
-  /// Create a time series counter. This begins sampling immediately. This counter
-  /// contains a number of samples that are collected periodically by calling sample_fn().
-  /// StopPeriodicCounters() must be called to stop the periodic updating before this
-  /// profile is destroyed. The periodic updating can be stopped earlier by calling
-  /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
+  /// Creates a sampling time series counter. This begins sampling immediately. This
+  /// counter contains a number of samples that are collected periodically by calling
+  /// sample_fn(). StopPeriodicCounters() must be called to stop the periodic updating
+  /// before this profile is destroyed. The periodic updating can be stopped earlier by
+  /// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
-  TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name,
-      TUnit::type unit, DerivedCounterFunction sample_fn);
+  TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name,
+      TUnit::type unit, SampleFunction sample_fn);
 
   /// Same as above except the samples are collected from 'src_counter'.
-  TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter);
+  TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter*
+      src_counter);
+
+  /// Adds a chunked time series counter to the profile. This begins sampling immediately.
+  /// This counter will collect new samples periodically by calling 'sample_fn()'. Samples
+  /// are not re-sampled into larger intervals, instead owners of this profile can call
+  /// ClearChunkedTimeSeriesCounters() to reset the sample buffers of all chunked time
+  /// series counters, e.g. after their current values have been transmitted to a remote
+  /// node for profile aggregation.
+  TimeSeriesCounter* AddChunkedTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction sample_fn);
+
+  /// Clear all chunked time series counters in this profile and all children.
+  void ClearChunkedTimeSeriesCounters();
 
   /// Recursively compute the fraction of the 'total_time' spent in this profile and
   /// its children.
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index 027f330..d517d37 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -20,9 +20,7 @@
 
 #include <string.h>
 #include <iostream>
-#include <boost/thread/lock_guard.hpp>
 
-#include "util/spinlock.h"
 #include "util/ubsan.h"
 
 namespace impala {
@@ -33,9 +31,13 @@ namespace impala {
 /// are collapsed and the collection period is doubled.
 /// The input period and the streaming sampler period do not need to match, the
 /// streaming sampler will average values.
-/// T is the type of the sample and must be a native numerical type (e.g. int or float).
+/// T is the type of the sample and must be a native numerical type which fulfills
+/// std::is_arithmetic (e.g. int or float).
+///
+/// This class is not thread-safe.
 template<typename T, int MAX_SAMPLES>
 class StreamingSampler {
+  static_assert(std::is_arithmetic<T>::value, "Numerical type required");
  public:
   StreamingSampler(int initial_period = 500)
     : samples_collected_(0) ,
@@ -45,17 +47,6 @@ class StreamingSampler {
       current_sample_total_time_(0) {
   }
 
-  /// Initialize the sampler with values.
-  StreamingSampler(int period, const std::vector<T>& initial_samples)
-    : samples_collected_(initial_samples.size()),
-      period_(period),
-      current_sample_sum_(0),
-      current_sample_count_(0),
-      current_sample_total_time_(0) {
-    DCHECK_LE(samples_collected_, MAX_SAMPLES);
-    Ubsan::MemCpy(samples_, initial_samples.data(), sizeof(T) * samples_collected_);
-  }
-
   /// Add a sample to the sampler. 'ms' is the time elapsed since the last time this
   /// was called.
   /// The input value is accumulated into current_*. If the total time elapsed
@@ -65,7 +56,6 @@ class StreamingSampler {
   /// TODO: we can make this more complex by taking a weighted average of samples
   /// accumulated in a period.
   void AddSample(T sample, int ms) {
-    boost::lock_guard<SpinLock> l(lock_);
     ++current_sample_count_;
     current_sample_sum_ += sample;
     current_sample_total_time_ += ms;
@@ -87,49 +77,15 @@ class StreamingSampler {
     }
   }
 
-  /// Get the samples collected.  Returns the number of samples and
-  /// the period they were collected at.
-  /// If lock is non-null, the lock will be taken before returning. The caller
-  /// must unlock it.
-  const T* GetSamples(int* num_samples, int* period, SpinLock** lock = NULL) const {
-    if (lock != NULL) {
-      lock_.lock();
-      *lock = &lock_;
-    }
+  /// Get the samples collected.  Returns the number of samples and the period they were
+  /// collected at.
+  const T* GetSamples(int* num_samples, int* period) const {
     *num_samples = samples_collected_;
     *period = period_;
     return samples_;
   }
 
-  /// Set the underlying data to period/samples
-  void SetSamples(int period, const std::vector<T>& samples) {
-    DCHECK_LE(samples.size(), MAX_SAMPLES);
-
-    boost::lock_guard<SpinLock> l(lock_);
-    period_ = period;
-    samples_collected_ = samples.size();
-    Ubsan::MemCpy(samples_, samples.data(), sizeof(T) * samples_collected_);
-    current_sample_sum_ = 0;
-    current_sample_count_ = 0;
-    current_sample_total_time_ = 0;
-  }
-
-  std::string DebugString(const std::string& prefix="") const {
-    boost::lock_guard<SpinLock> l(lock_);
-    std::stringstream ss;
-    ss << prefix << "Period = " << period_ << std::endl
-       << prefix << "Num = " << samples_collected_ << std::endl
-       << prefix << "Samples = {";
-    for (int i = 0; i < samples_collected_; ++i) {
-      ss << samples_[i] << ", ";
-    }
-    ss << prefix << "}" << std::endl;
-    return ss.str();
-  }
-
  private:
-  mutable SpinLock lock_;
-
   /// Aggregated samples collected. Note: this is not all the input samples from
   /// AddSample(), as logically, those samples get resampled and aggregated.
   T samples_[MAX_SAMPLES];
diff --git a/be/src/util/system-state-info-test.cc b/be/src/util/system-state-info-test.cc
new file mode 100644
index 0000000..561b2d6
--- /dev/null
+++ b/be/src/util/system-state-info-test.cc
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/atomic.h"
+#include "testutil/gtest-util.h"
+#include "util/system-state-info.h"
+#include "util/time.h"
+
+#include <thread>
+
+namespace impala {
+
+class SystemStateInfoTest : public testing::Test {
+ protected:
+  SystemStateInfo info;
+};
+
+TEST_F(SystemStateInfoTest, FirstCallReturnsZero) {
+  const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+  EXPECT_EQ(0, r.user + r.system + r.iowait);
+}
+
+// Smoke test to make sure that we read non-zero values from /proc/stat.
+TEST_F(SystemStateInfoTest, ReadProcStat) {
+  info.ReadCurrentProcStat();
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_GT(state[SystemStateInfo::CPU_USER], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_SYSTEM], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_IDLE], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_IOWAIT], 0);
+}
+
+// Tests parsing a line similar to the first line of /proc/stat.
+TEST_F(SystemStateInfoTest, ParseProcStat) {
+  // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
+  info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_EQ(state[SystemStateInfo::CPU_USER], 20);
+  EXPECT_EQ(state[SystemStateInfo::CPU_SYSTEM], 10);
+  EXPECT_EQ(state[SystemStateInfo::CPU_IDLE], 70);
+  EXPECT_EQ(state[SystemStateInfo::CPU_IOWAIT], 100);
+
+  // Test that values larger than INT_MAX parse without error.
+  info.ReadProcStatString("cpu  3000000000 30 10 70 100 0 0 0 0 0");
+  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_EQ(changed_state[SystemStateInfo::CPU_USER], 3000000000);
+}
+
+// Smoke test for the public interface.
+TEST_F(SystemStateInfoTest, GetCpuUsageRatios) {
+  AtomicBool running(true);
+  // Generate some load to observe counters > 0.
+  std::thread t([&running]() { while (running.Load()); });
+  for (int i = 0; i < 3; ++i) {
+    SleepForMs(200);
+    info.CaptureSystemStateSnapshot();
+    const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+    EXPECT_GT(r.user + r.system + r.iowait, 0);
+  }
+  running.Store(false);
+  t.join();
+}
+
+// Tests the computation logic.
+TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
+  info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
+  info.ReadProcStatString("cpu  30 30 20 70 100 0 0 0 0 0");
+  info.ComputeCpuRatios();
+  const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+  EXPECT_EQ(r.user, 5000);
+  EXPECT_EQ(r.system, 5000);
+  EXPECT_EQ(r.iowait, 0);
+}
+
+} // namespace impala
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/util/system-state-info.cc b/be/src/util/system-state-info.cc
new file mode 100644
index 0000000..ea3803b
--- /dev/null
+++ b/be/src/util/system-state-info.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gutil/strings/split.h"
+#include "util/error-util.h"
+#include "util/string-parser.h"
+#include "util/system-state-info.h"
+
+#include <algorithm>
+#include <fstream>
+#include <iostream>
+
+#include "common/names.h"
+
+using std::accumulate;
+
+namespace impala {
+
+// Partially initializing cpu_ratios_ will default-initialize the remaining members.
+SystemStateInfo::SystemStateInfo() {
+  memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+  ReadCurrentProcStat();
+}
+
+void SystemStateInfo::CaptureSystemStateSnapshot() {
+  // Capture and compute CPU usage
+  ReadCurrentProcStat();
+  ComputeCpuRatios();
+}
+
+int64_t SystemStateInfo::ParseInt64(const string& val) const {
+  StringParser::ParseResult result;
+  int64_t parsed = StringParser::StringToInt<int64_t>(val.c_str(), val.size(), &result);
+  if (result == StringParser::PARSE_SUCCESS) return parsed;
+  return -1;
+}
+
+void SystemStateInfo::ReadFirstLineFromFile(const char* path, string* out) const {
+  ifstream proc_file(path);
+  if (!proc_file.is_open()) {
+    LOG(WARNING) << "Could not open " << path << ": " << GetStrErrMsg() << endl;
+    return;
+  }
+  DCHECK(proc_file.is_open());
+  DCHECK(out != nullptr);
+  getline(proc_file, *out);
+}
+
+void SystemStateInfo::ReadCurrentProcStat() {
+  string line;
+  ReadFirstLineFromFile("/proc/stat", &line);
+  ReadProcStatString(line);
+}
+
+void SystemStateInfo::ReadProcStatString(const string& stat_string) {
+  stringstream ss(stat_string);
+
+  // Skip the first value ('cpu  ');
+  ss.ignore(5);
+
+  cur_val_idx_ = 1 - cur_val_idx_;
+  CpuValues& next_values = cpu_values_[cur_val_idx_];
+
+  for (int i = 0; i < NUM_CPU_VALUES; ++i) {
+    int64_t v = -1;
+    ss >> v;
+    DCHECK_GE(v, 0) << "Value " << i << ": " << v;
+    // Clamp invalid entries at 0
+    next_values[i] = max(v, 0L);
+  }
+}
+
+void SystemStateInfo::ComputeCpuRatios() {
+  const CpuValues& cur = cpu_values_[cur_val_idx_];
+  const CpuValues& old = cpu_values_[1 - cur_val_idx_];
+
+  // Sum up all counters
+  int64_t cur_sum = accumulate(cur.begin(), cur.end(), 0);
+  int64_t old_sum = accumulate(old.begin(), old.end(), 0);
+
+  int64_t total_tics = cur_sum - old_sum;
+  // If less than 1/USER_HZ time has time has passed for any of the counters, the ratio is
+  // zero (to avoid dividing by zero).
+  if (total_tics == 0) {
+    memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+    return;
+  }
+  DCHECK_GT(total_tics, 0);
+  // Convert each ratio to basis points.
+  const int BASIS_MAX = 10000;
+  cpu_ratios_.user = ((cur[CPU_USER] - old[CPU_USER]) * BASIS_MAX) / total_tics;
+  cpu_ratios_.system = ((cur[CPU_SYSTEM] - old[CPU_SYSTEM]) * BASIS_MAX) / total_tics;
+  cpu_ratios_.iowait = ((cur[CPU_IOWAIT] - old[CPU_IOWAIT]) * BASIS_MAX) / total_tics;
+}
+
+} // namespace impala
diff --git a/be/src/util/system-state-info.h b/be/src/util/system-state-info.h
new file mode 100644
index 0000000..df2068f
--- /dev/null
+++ b/be/src/util/system-state-info.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
+
+#include "common/names.h"
+namespace impala {
+
+/// Utility class to track and compute system resource consumption.
+///
+/// This class can be used to capture snapshots of various metrics of system resource
+/// consumption (e.g. CPU usage) and compute usage ratios and derived metrics between
+/// subsequent snapshots. Users of this class must call CaptureSystemStateSnapshot() and
+/// can then obtain various resource utilization metrics through getter methods (e.g.
+/// GetCpuUsageRatios()).
+class SystemStateInfo {
+ public:
+  SystemStateInfo();
+  /// Takes a snapshot of the current system resource usage and compute the usage ratios
+  /// for the time since the previous snapshot was taken.
+  void CaptureSystemStateSnapshot();
+
+  /// Ratios use basis points as their unit (1/100th of a percent, i.e. 0.0001).
+  struct CpuUsageRatios {
+    int32_t user;
+    int32_t system;
+    int32_t iowait;
+  };
+  /// Returns a struct containing the CPU usage ratios for the interval between the last
+  /// two calls to CaptureSystemStateSnapshot().
+  const CpuUsageRatios& GetCpuUsageRatios() { return cpu_ratios_; }
+
+ private:
+  int64_t ParseInt64(const std::string& val) const;
+  void ReadFirstLineFromFile(const char* path, std::string* out) const;
+
+  /// Rotates 'cur_val_idx_' and reads the current CPU usage values from /proc/stat into
+  /// the current set of values.
+  void ReadCurrentProcStat();
+
+  /// Rotates 'cur_val_idx_' and reads the CPU usage values from 'stat_string' into the
+  /// current set of values.
+  void ReadProcStatString(const string& stat_string);
+
+  /// Computes the CPU usage ratios for the interval between the last two calls to
+  /// CaptureSystemStateSnapshot() and stores the result in 'cpu_ratios_'.
+  void ComputeCpuRatios();
+
+  enum PROC_STAT_CPU_VALUES {
+    CPU_USER = 0,
+    CPU_NICE,
+    CPU_SYSTEM,
+    CPU_IDLE,
+    CPU_IOWAIT,
+    NUM_CPU_VALUES
+  };
+
+  /// We store the CPU usage values in an array so that we can iterate over them, e.g.
+  /// when reading them from a file or summing them up.
+  typedef std::array<int64_t, NUM_CPU_VALUES> CpuValues;
+  /// Two buffers to keep the current and previous set of CPU usage values.
+  CpuValues cpu_values_[2];
+  int cur_val_idx_ = 0;
+
+  /// The computed CPU usage ratio between the current and previous snapshots in
+  /// cpu_values_. Updated in ComputeCpuRatios().
+  CpuUsageRatios cpu_ratios_;
+
+  FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatios);
+  FRIEND_TEST(SystemStateInfoTest, ParseProcStat);
+  FRIEND_TEST(SystemStateInfoTest, ReadProcStat);
+};
+
+} // namespace impala
diff --git a/bin/parse-thrift-profile.py b/bin/parse-thrift-profile.py
index 5f8485f..55cf1f2 100755
--- a/bin/parse-thrift-profile.py
+++ b/bin/parse-thrift-profile.py
@@ -39,14 +39,8 @@
 # 2018-04-13T15:06:34.144000 e44af7f93edb8cd6:1b1f801600000000 TRuntimeProfileTree(nodes=[TRuntimeProf...
 
 
-from thrift.protocol import TCompactProtocol
-from thrift.TSerialization import deserialize
-from RuntimeProfile.ttypes import TRuntimeProfileTree
-
-import base64
-import datetime
+from impala_py_lib import profiles
 import sys
-import zlib
 
 if len(sys.argv) == 1 or sys.argv[1] == "-":
   input_data = sys.stdin
@@ -57,23 +51,5 @@ else:
   sys.exit(1)
 
 for line in input_data:
-  space_separated = line.split(" ")
-  if len(space_separated) == 3:
-    ts = int(space_separated[0])
-    print datetime.datetime.fromtimestamp(ts/1000.0).isoformat(), space_separated[1],
-    base64_encoded = space_separated[2]
-  elif len(space_separated) == 1:
-    base64_encoded = space_separated[0]
-  else:
-    raise Exception("Unexpected line: " + line)
-  possibly_compressed = base64.b64decode(base64_encoded)
-  # Handle both compressed and uncompressed Thrift profiles
-  try:
-    thrift = zlib.decompress(possibly_compressed)
-  except zlib.error:
-    thrift = possibly_compressed
-
-  tree = TRuntimeProfileTree()
-  deserialize(tree, thrift, protocol_factory=TCompactProtocol.TCompactProtocolFactory())
-  tree.validate()
+  tree = profiles.decode_profile_line(line)
   print tree
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index a190d83..ee8f369 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -311,6 +311,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   74: optional string client_identifier;
+
+  // See comment in ImpalaService.thrift
+  75: optional double resource_trace_ratio = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
@@ -453,6 +456,10 @@ struct TQueryCtx {
   // stats and key column predicate selectivity. Generally only disabled
   // for testing.
   20: optional bool disable_hbase_row_est = false;
+
+  // Flag to enable tracing of resource usage consumption for all fragment instances of a
+  // query. Set in ImpalaServer::PrepareQueryContext().
+  21: required bool trace_resource_usage = false
 }
 
 // Specification of one output destination of a plan fragment
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2986cd9..bb3b0af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -350,7 +350,12 @@ enum TImpalaQueryOptions {
   // An opaque string, not used by Impala itself, that can be used to identify
   // the client, like a User-Agent in HTTP. Drivers should set this to
   // their version number. May also be used by tests to help identify queries.
-  CLIENT_IDENTIFIER
+  CLIENT_IDENTIFIER,
+
+  // Probability to enable tracing of resource usage consumption on all fragment instance
+  // executors of a query. Must be between 0 and 1 inclusive, 0 means no query will be
+  // traced, 1 means all queries will be traced.
+  RESOURCE_TRACE_RATIO,
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Metrics.thrift b/common/thrift/Metrics.thrift
index 4f2c7f2..f97eda0 100644
--- a/common/thrift/Metrics.thrift
+++ b/common/thrift/Metrics.thrift
@@ -29,6 +29,9 @@ enum TUnit {
   BYTES_PER_SECOND,
   TIME_NS,
   DOUBLE_VALUE,
+  // 100th of a percent, used to express ratios etc., range from 0 to 10000, pretty
+  // printed as integer percentages from 0 to 100.
+  BASIS_POINTS,
   // No units at all, may not be a numerical quantity
   NONE,
   TIME_MS,
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index f131564..d47afa6 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -65,6 +65,12 @@ struct TTimeSeriesCounter {
 
   // The sampled values.
   4: required list<i64> values
+
+  // The index of the first value in this series (this is equal to the total number of
+  // values contained in previous updates for this counter). Values > 0 mean that this
+  // series contains an interval of a larger series. For values > 0, period_ms should be
+  // ignored, as chunked counters don't resample their values.
+  5: optional i64 start_index
 }
 
 // Thrift version of RuntimeProfile::SummaryStatsCounter.
@@ -133,4 +139,5 @@ struct TRuntimeProfileTree {
 // A list of TRuntimeProfileTree structures.
 struct TRuntimeProfileForest {
   1: required list<TRuntimeProfileTree> profile_trees
+  2: optional TRuntimeProfileTree host_profile
 }
diff --git a/bin/parse-thrift-profile.py b/lib/python/impala_py_lib/profiles.py
old mode 100755
new mode 100644
similarity index 58%
copy from bin/parse-thrift-profile.py
copy to lib/python/impala_py_lib/profiles.py
index 5f8485f..a2f6aeb
--- a/bin/parse-thrift-profile.py
+++ b/lib/python/impala_py_lib/profiles.py
@@ -1,5 +1,4 @@
 #!/usr/bin/env impala-python
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,51 +15,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# Parses a base64-encoded profile provided via stdin. It accepts
-# three common formats:
-#
-# 1. Impala profile logs of the format
-#    "<ts> <queryid> <base64encoded, compressed thrift profile>"
-# 2. Just the base64-encoded compressed thrift profile
-# 3. Base-64 encoded uncompressed thrift profile.
-#
-# In all cases, the script expects one profile per line.
-#
-# For example:
-#
-# $ cat logs/cluster_test/custom_cluster_tests/profiles/impala_profile_log \
-#      | head -n 1 | awk '{ print $3 }' | parse-profile.py
-# TRuntimeProfileTree(nodes=[TRuntimeProfileNode(info_strings_display_order=....
-#
-# or
-#
-# $ bin/parse-thrift-profile.py logs/custom_cluster_tests/profiles/impala_profile_log_1.1-1523657191158
-# 2018-04-13T15:06:34.144000 e44af7f93edb8cd6:1b1f801600000000 TRuntimeProfileTree(nodes=[TRuntimeProf...
 
-
-from thrift.protocol import TCompactProtocol
-from thrift.TSerialization import deserialize
-from RuntimeProfile.ttypes import TRuntimeProfileTree
+# This file contains library functions to decode and access Impala query profiles.
 
 import base64
 import datetime
-import sys
 import zlib
+from thrift.protocol import TCompactProtocol
+from thrift.TSerialization import deserialize
+from RuntimeProfile.ttypes import TRuntimeProfileTree
 
-if len(sys.argv) == 1 or sys.argv[1] == "-":
-  input_data = sys.stdin
-elif len(sys.argv) == 2:
-  input_data = file(sys.argv[1])
-else:
-  print >> sys.stderr, "Usage: %s [file]" % (sys.argv[0],)
-  sys.exit(1)
 
-for line in input_data:
+def decode_profile_line(line):
   space_separated = line.split(" ")
   if len(space_separated) == 3:
     ts = int(space_separated[0])
-    print datetime.datetime.fromtimestamp(ts/1000.0).isoformat(), space_separated[1],
+    print datetime.datetime.fromtimestamp(ts / 1000.0).isoformat(), space_separated[1]
     base64_encoded = space_separated[2]
   elif len(space_separated) == 1:
     base64_encoded = space_separated[0]
@@ -76,4 +46,5 @@ for line in input_data:
   tree = TRuntimeProfileTree()
   deserialize(tree, thrift, protocol_factory=TCompactProtocol.TCompactProtocolFactory())
   tree.validate()
-  print tree
+
+  return tree
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 4056011..08f0c72 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -63,6 +63,7 @@ class ImpalaBeeswaxException(Exception):
 class ImpalaBeeswaxResult(object):
   def __init__(self, **kwargs):
     self.query = kwargs.get('query', None)
+    self.query_id = kwargs['query_id']
     self.success = kwargs.get('success', False)
     # Insert returns an int, convert into list to have a uniform data type.
     # TODO: We should revisit this if we have more datatypes to deal with.
@@ -435,7 +436,8 @@ class ImpalaBeeswaxClient(object):
     if query_type == 'use':
       # TODO: "use <database>" does not currently throw an error. Need to update this
       # to handle the error case once that behavior has been changed.
-      return ImpalaBeeswaxResult(query=query_string, success=True, data=[])
+      return ImpalaBeeswaxResult(query=query_string, query_id=query_handle.id,
+                                 success=True, data=[])
 
     # Result fetching for insert is different from other queries.
     exec_result = None
@@ -459,7 +461,8 @@ class ImpalaBeeswaxClient(object):
         break
 
     # The query executed successfully and all the data was fetched.
-    exec_result = ImpalaBeeswaxResult(success=True, data=result_rows, schema=schema)
+    exec_result = ImpalaBeeswaxResult(query_id=handle.id, success=True, data=result_rows,
+                                      schema=schema)
     exec_result.summary = 'Returned %d rows' % (len(result_rows))
     return exec_result
 
@@ -469,7 +472,7 @@ class ImpalaBeeswaxClient(object):
     # The insert was successful
     num_rows = sum(map(int, result.rows_modified.values()))
     data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
-    exec_result = ImpalaBeeswaxResult(success=True, data=data)
+    exec_result = ImpalaBeeswaxResult(query_id=handle.id, success=True, data=data)
     exec_result.summary = "Inserted %d rows" % (num_rows,)
     return exec_result
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 9c85e2e..d25170f 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -56,7 +56,7 @@ class TestObservability(ImpalaTestSuite):
 
   def test_broadcast_num_rows(self):
     """Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node
-    in the exec summaty is correctly set as the max over all instances, not the sum."""
+    in the exec summary is correctly set as the max over all instances, not the sum."""
     query = """select distinct a.int_col, a.string_col from functional.alltypes a
         inner join functional.alltypessmall b on (a.id = b.id)
         where a.year = 2009 and b.month = 2"""
@@ -424,58 +424,94 @@ class TestObservability(ImpalaTestSuite):
     assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] +
                                           counters["TotalInnerBytesSent"])
 
+  def test_query_profile_contains_host_resource_usage(self):
+    """Tests that the profile contains a sub-profile with per node resource usage."""
+    result = self.execute_query("select count(*), sleep(1000) from functional.alltypes")
+    profile = result.runtime_profile
+    expected_str = "Per Node Profiles:"
+    assert any(expected_str in line for line in profile.splitlines())
+
+  def test_query_profile_host_cpu_usage_off(self):
+    """Tests that the query profile does not contain CPU metrics by default or when
+    disabled explicitly."""
+    query = "select count(*), sleep(1000) from functional.alltypes"
+    for query_opts in [None, {'resource_trace_ratio': 0.0}]:
+      profile = self.execute_query(query, query_opts).runtime_profile
+      # Assert that no CPU counters exist in the profile
+      for line in profile.splitlines():
+        assert not re.search("HostCpu.*Percentage", line)
+
+  def test_query_profile_contains_host_cpu_usage(self):
+    """Tests that the query profile contains various CPU metrics."""
+    query_opts = {'resource_trace_ratio': 1.0}
+    query = "select count(*), sleep(1000) from functional.alltypes"
+    profile = self.execute_query(query, query_opts).runtime_profile
+    # We check for 500ms because a query with 1s duration won't hit the 64 values limit.
+    expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
+                     "HostCpuSysPercentage (500.000ms):",
+                     "HostCpuUserPercentage (500.000ms):"]
+
+    # Assert that all expected counters exist in the profile.
+    for expected_str in expected_strs:
+      assert any(expected_str in line for line in profile.splitlines()), expected_str
+
+    # Check that there are some values for each counter.
+    for line in profile.splitlines():
+      if not any(key in line for key in expected_strs):
+        continue
+      values = line.split(':')[1].strip().split(',')
+      assert len(values) > 0
+
+  def _find_ts_counters_in_thrift_profile(self, profile, name):
+    """Finds all time series counters in 'profile' with a matching name."""
+    counters = []
+    for node in profile.nodes:
+      for counter in node.time_series_counters or []:
+        if counter.name == name:
+          counters.append(counter)
+    return counters
+
+  def _get_thrift_profile(self, query_id, timeout=MAX_THRIFT_PROFILE_WAIT_TIME_S):
+    """Downloads a thrift profile and asserts that a profile was retrieved within the
+       specified timeout. If you see unexpected timeouts, try running the calling test
+       serially."""
+    thrift_profile = self.impalad_test_service.get_thrift_profile(query_id,
+                                                                  timeout=timeout)
+    assert thrift_profile, "Debug thrift profile for query {0} not available in {1} \
+        seconds".format(query_id, timeout)
+    return thrift_profile
 
-class TestThriftProfile(ImpalaTestSuite):
-  @classmethod
-  def get_workload(self):
-    return 'functional-query'
+  @pytest.mark.execute_serially
+  def test_thrift_profile_contains_cpu_usage(self):
+    """Tests that the thrift profile contains a time series counter for CPU resource
+       usage."""
+    query_opts = {'resource_trace_ratio': 1.0}
+    result = self.execute_query("select sleep(2000)", query_opts)
+    thrift_profile = self._get_thrift_profile(result.query_id)
+
+    cpu_key = "HostCpuUserPercentage"
+    cpu_counters = self._find_ts_counters_in_thrift_profile(thrift_profile, cpu_key)
+    # The query will run on a single node, we will only find the counter once.
+    assert len(cpu_counters) == 1
+    cpu_counter = cpu_counters[0]
+    assert len(cpu_counter.values) > 0
 
-  # IMPALA-6399: Run this test serially to avoid a delay over the wait time in fetching
-  # the profile.
-  # This test needs to call self.client.close() to force computation of query end time,
-  # so it has to be in its own suite (IMPALA-6498).
   @pytest.mark.execute_serially
   def test_query_profile_thrift_timestamps(self):
     """Test that the query profile start and end time date-time strings have
     nanosecond precision. Nanosecond precision is expected by management API clients
     that consume Impala debug webpages."""
     query = "select sleep(5)"
-    handle = self.client.execute_async(query)
-    query_id = handle.get_handle().id
-    results = self.client.fetch(query, handle)
-    self.client.close()
-
-    start = time()
-    end = start + MAX_THRIFT_PROFILE_WAIT_TIME_S
-    while time() <= end:
-      # Sleep before trying to fetch the profile. This helps to prevent a warning when the
-      # profile is not yet available immediately. It also makes it less likely to
-      # introduce an error below in future changes by forgetting to sleep.
-      sleep(1)
-      tree = self.impalad_test_service.get_thrift_profile(query_id)
-      if not tree:
-        continue
-
-      # tree.nodes[1] corresponds to ClientRequestState::summary_profile_
-      # See be/src/service/client-request-state.[h|cc].
-      start_time = tree.nodes[1].info_strings["Start Time"]
-      end_time = tree.nodes[1].info_strings["End Time"]
-      # Start and End Times are of the form "2017-12-07 22:26:52.167711000"
-      start_time_sub_sec_str = start_time.split('.')[-1]
-      end_time_sub_sec_str = end_time.split('.')[-1]
-      if len(end_time_sub_sec_str) == 0:
-        elapsed = time() - start
-        logging.info("end_time_sub_sec_str hasn't shown up yet, elapsed=%d", elapsed)
-        continue
-
-      assert len(end_time_sub_sec_str) == 9, end_time
-      assert len(start_time_sub_sec_str) == 9, start_time
-      return True
-
-    # If we're here, we didn't get the final thrift profile from the debug web page.
-    # This could happen due to heavy system load. The test is then inconclusive.
-    # Log a message and fail this run.
+    result = self.execute_query(query)
 
-    dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format(
-        query_id, MAX_THRIFT_PROFILE_WAIT_TIME_S)
-    assert False, dbg_str
+    tree = self._get_thrift_profile(result.query_id)
+    # tree.nodes[1] corresponds to ClientRequestState::summary_profile_
+    # See be/src/service/client-request-state.[h|cc].
+    start_time = tree.nodes[1].info_strings["Start Time"]
+    end_time = tree.nodes[1].info_strings["End Time"]
+    # Start and End Times are of the form "2017-12-07 22:26:52.167711000"
+    start_time_sub_sec_str = start_time.split('.')[-1]
+    end_time_sub_sec_str = end_time.split('.')[-1]
+
+    assert len(end_time_sub_sec_str) == 9, end_time
+    assert len(start_time_sub_sec_str) == 9, start_time