You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/06 23:00:01 UTC

(impala) 02/02: IMPALA-12385: Enable Periodic metrics by default

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d75807a195273cdba29e33f00b7b6f9bee012f62
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Fri Aug 18 07:44:44 2023 -0500

    IMPALA-12385: Enable Periodic metrics by default
    
    This patch enables periodic metrics in query profiles by default and
    changes the metric collectors to be more suitable for mixed workloads.
    
    -Changed default of resource_trace_ratio to 1.
    -Changed profile metrics to use sampling counters which can automatically
     resize for long queries.
    -Reduced profile metric samping interval to 50ms to support short-running
     queries.
    -Changed fragment metrics to use the same sampling interval as periodic
     metrics.
    -Added singleton PeriodicCounterUpdater and thread for updating system
     (KRPC) metrics at a different period than fragment metrics.
    -Added new flag periodic_system_counter_update_period_ms for configuring
     system metric update period. Default is 500ms.
    
    Example profile output:
    - HostCpuIoWaitPercentage (400.000ms): 1, 0, 2, 3, 4, 6, 5, 2, 1, ...
    - HostCpuSysPercentage (400.000ms): 1, 12, 10, 11, 11, 5, 3, 3, 3, ...
    - HostCpuUserPercentage (400.000ms): 8, 46, 39, 39, 39, 29, 22, 23, ...
    
    Testing:
    -Updated runtime-profile-test and test_observability.py for new defaults
    -Manual inspection of query profile metrics for long-running queries
    -Tested for performance regression using 50 concurrent TPCH Q1 and with
     periodic_counter_update_period_ms=1 to verify that the periodic update
     loop does not affect peformance or become a bottleneck
    
    Change-Id: Ic8e5cbfd4b324081158574ceb8f4b3a062a69fd1
    Reviewed-on: http://gerrit.cloudera.org:8080/20377
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Michael Smith <mi...@cloudera.com>
---
 be/src/runtime/exec-env.cc               |   2 +-
 be/src/runtime/krpc-data-stream-recvr.cc |   6 +-
 be/src/runtime/query-state.cc            |  14 ++---
 be/src/util/periodic-counter-updater.cc  | 103 ++++++++++++++++++++-----------
 be/src/util/periodic-counter-updater.h   |  22 +++++--
 be/src/util/runtime-profile-counters.h   |  12 +++-
 be/src/util/runtime-profile-test.cc      |   3 +-
 be/src/util/runtime-profile.cc           |  16 +++--
 be/src/util/runtime-profile.h            |  11 +++-
 be/src/util/streaming-sampler.h          |   2 +-
 common/thrift/Query.thrift               |   2 +-
 tests/query_test/test_observability.py   |  37 +++++------
 12 files changed, 148 insertions(+), 82 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8f2768e49..c31bd597f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -637,7 +637,7 @@ void ExecEnv::InitSystemStateInfo() {
   system_state_info_.reset(new SystemStateInfo());
   PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
     s->CaptureSystemStateSnapshot();
-  });
+  }, true);
 }
 
 Status ExecEnv::GetKuduClient(const vector<string>& master_addresses,
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index a3b804e7f..394fe64cb 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -707,7 +707,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   // Initialize various counters for measuring dequeuing from queues.
   bytes_dequeued_counter_ =
       ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
-  bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+  bytes_dequeued_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
       dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
   queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
   data_wait_timer_ =
@@ -719,7 +719,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   // Initialize various counters for measuring enqueuing into queues.
   bytes_received_counter_ =
       ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
-  bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+  bytes_received_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
       enqueue_profile_, "BytesReceived", bytes_received_counter_);
   deserialize_row_batch_timer_ =
       ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
@@ -735,7 +735,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
       ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
   deferred_rpcs_time_series_counter_ =
       enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
-      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+      bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this), true);
   total_has_deferred_rpcs_timer_ =
       ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
   dispatch_timer_ =
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 31f833b8d..5738448e8 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -218,33 +218,33 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
   // Initialize resource tracking counters.
   if (query_ctx().trace_resource_usage) {
     SystemStateInfo* system_state_info = exec_env->system_state_info();
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().user;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().system;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().iowait;
         });
     // Add network usage
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetNetworkUsage().rx_rate;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetNetworkUsage().tx_rate;
         });
     // Add disk stats
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostDiskReadThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetDiskStats().read_rate;
         });
-    host_profile_->AddChunkedTimeSeriesCounter(
+    host_profile_->AddSamplingTimeSeriesCounter(
         "HostDiskWriteThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
         return system_state_info->GetDiskStats().write_rate;
         });
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 8bef6aa9a..edb8a4d14 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -26,25 +26,43 @@ namespace posix_time = boost::posix_time;
 using boost::get_system_time;
 using boost::system_time;
 
-// Period to update rate counters and sampling counters in ms.
-DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
-    " sampling counters in ms");
+// Period to update query profile rate counters and sampling counters in ms.
+DEFINE_int32(periodic_counter_update_period_ms, 50, "Period to update"
+    " query profile rate counters and sampling counters in ms");
+
+// Period to update system-level rate counters and sampling counters in ms.
+DEFINE_int32(periodic_system_counter_update_period_ms, 500, "Period to update"
+    " system-level rate counters and sampling counters in ms");
 
 namespace impala {
 
+// Updater for profile counters
 PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
 
+// Updater for system counters
+PeriodicCounterUpdater* PeriodicCounterUpdater::system_instance_ = nullptr;
+
 void PeriodicCounterUpdater::Init() {
-  DCHECK(instance_ == nullptr);
-  // Create the singleton, which will live until the process terminates.
-  instance_ = new PeriodicCounterUpdater;
+  DCHECK(instance_ == nullptr && system_instance_ == nullptr);
+  // Create two singletons, which will live until the process terminates.
+  instance_ = new PeriodicCounterUpdater(FLAGS_periodic_counter_update_period_ms);
+
   instance_->update_thread_.reset(
-      new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
+      new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, instance_, instance_)));
+
+  system_instance_ =
+      new PeriodicCounterUpdater(FLAGS_periodic_system_counter_update_period_ms);
+
+  system_instance_->update_thread_.reset(
+      new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, system_instance_,
+          system_instance_)));
+
 }
 
-void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) {
-  lock_guard<SpinLock> l(instance_->update_fns_lock_);
-  instance_->update_fns_.push_back(update_fn);
+void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn, bool is_system) {
+  PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
+  lock_guard<SpinLock> l(instance->update_fns_lock_);
+  instance->update_fns_.push_back(update_fn);
 }
 
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
@@ -52,6 +70,9 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
     RuntimeProfile::SampleFunction sample_fn,
     RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
   DCHECK(src_counter == NULL || sample_fn == NULL);
+  DCHECK(src_counter == NULL || src_counter->GetIsSystem() == dst_counter->GetIsSystem());
+  PeriodicCounterUpdater* instance = dst_counter->GetIsSystem() ?
+      system_instance_ : instance_;
 
   switch (type) {
     case RATE_COUNTER: {
@@ -59,8 +80,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.src_counter = src_counter;
       counter.sample_fn = sample_fn;
       counter.elapsed_ms = 0;
-      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
-      instance_->rate_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> ratelock(instance->rate_lock_);
+      instance->rate_counters_[dst_counter] = counter;
       break;
     }
     case SAMPLING_COUNTER: {
@@ -69,8 +90,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
       counter.sample_fn = sample_fn;
       counter.num_sampled = 0;
       counter.total_sampled_value = 0;
-      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
-      instance_->sampling_counters_[dst_counter] = counter;
+      lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+      instance->sampling_counters_[dst_counter] = counter;
       break;
     }
     default:
@@ -79,35 +100,43 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
 }
 
 void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> ratelock(instance_->rate_lock_);
-  instance_->rate_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> ratelock(instance->rate_lock_);
+  instance->rate_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) {
-  lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
-  instance_->sampling_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+  instance->sampling_counters_.erase(counter);
 }
 
 void PeriodicCounterUpdater::RegisterBucketingCounters(
     RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
+  PeriodicCounterUpdater* instance = src_counter->GetIsSystem() ?
+      system_instance_ : instance_;
   BucketCountersInfo info;
   info.src_counter = src_counter;
   info.num_sampled = 0;
-  lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
-  instance_->bucketing_counters_[buckets] = info;
+  lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
+  instance->bucketing_counters_[buckets] = info;
 }
 
 void PeriodicCounterUpdater::StopBucketingCounters(
-    vector<RuntimeProfile::Counter*>* buckets) {
+    vector<RuntimeProfile::Counter*>* buckets, bool is_system) {
   int64_t num_sampled = 0;
+  PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
   {
-    lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+    lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
     BucketCountersMap::iterator itr =
-        instance_->bucketing_counters_.find(buckets);
+        instance->bucketing_counters_.find(buckets);
     // If not registered, we have nothing to do.
-    if (itr == instance_->bucketing_counters_.end()) return;
+    if (itr == instance->bucketing_counters_.end()) return;
+    DCHECK(is_system == itr->second.src_counter->GetIsSystem());
     num_sampled = itr->second.num_sampled;
-    instance_->bucketing_counters_.erase(itr);
+    instance->bucketing_counters_.erase(itr);
   }
 
   if (num_sampled > 0) {
@@ -120,20 +149,24 @@ void PeriodicCounterUpdater::StopBucketingCounters(
 
 void PeriodicCounterUpdater::RegisterTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
-  instance_->time_series_counters_.insert(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+      system_instance_ : instance_;
+  lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+  instance->time_series_counters_.insert(counter);
 }
 
 void PeriodicCounterUpdater::StopTimeSeriesCounter(
     RuntimeProfile::TimeSeriesCounter* counter) {
-  lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
-  instance_->time_series_counters_.erase(counter);
+  PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+     system_instance_ : instance_;
+  lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+  instance->time_series_counters_.erase(counter);
 }
 
-void PeriodicCounterUpdater::UpdateLoop() {
+void PeriodicCounterUpdater::UpdateLoop(PeriodicCounterUpdater* instance) {
   while (true) {
     system_time before_time = get_system_time();
-    SleepForMs(FLAGS_periodic_counter_update_period_ms);
+    SleepForMs(update_period_);
     posix_time::time_duration elapsed = get_system_time() - before_time;
     int elapsed_ms = elapsed.total_milliseconds();
 
@@ -143,7 +176,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+      lock_guard<SpinLock> ratelock(instance->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
         it->second.elapsed_ms += elapsed_ms;
@@ -160,7 +193,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+      lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
       for (SamplingCounterMap::iterator it = sampling_counters_.begin();
            it != sampling_counters_.end(); ++it) {
         ++it->second.num_sampled;
@@ -179,7 +212,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+      lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
       for (BucketCountersMap::iterator it = bucketing_counters_.begin();
            it != bucketing_counters_.end(); ++it) {
         int64_t val = it->second.src_counter->value();
@@ -190,7 +223,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
     }
 
     {
-      lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+      lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
       for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
            it != time_series_counters_.end(); ++it) {
         (*it)->AddSample(elapsed_ms);
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 0f92070e9..6b3ef0eaf 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -39,6 +39,11 @@ namespace impala {
 /// future stale samples from polluting the useful values.
 class PeriodicCounterUpdater {
  public:
+
+  PeriodicCounterUpdater(const int32_t update_period)
+    : update_period_(update_period) {
+  }
+
   enum PeriodicCounterType {
     RATE_COUNTER = 0,
     SAMPLING_COUNTER,
@@ -52,7 +57,7 @@ class PeriodicCounterUpdater {
   /// Registers an update function that will be called before individual counters will be
   /// updated. This can be used to update some global metric once before reading it
   /// through individual counters.
-  static void RegisterUpdateFunction(UpdateFn update_fn);
+  static void RegisterUpdateFunction(UpdateFn update_fn, bool is_system);
 
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
@@ -80,7 +85,8 @@ class PeriodicCounterUpdater {
   /// convert the buckets from count to percentage. If not registered, has no effect.
   /// Perioidic counters are updated periodically so should be removed as soon as the
   /// underlying counter is no longer going to change.
-  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
+  static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
+      bool is_system = false);
 
   /// Stops 'counter' from receiving any more samples.
   static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);
@@ -107,7 +113,7 @@ class PeriodicCounterUpdater {
 
   /// Loop for periodic counter update thread.  This thread wakes up once in a while
   /// and updates all the added rate counters and sampling counters.
-  [[noreturn]] void UpdateLoop();
+  [[noreturn]] void UpdateLoop(PeriodicCounterUpdater* instance);
 
   /// Thread performing asynchronous updates.
   boost::scoped_ptr<boost::thread> update_thread_;
@@ -149,8 +155,14 @@ class PeriodicCounterUpdater {
   typedef boost::unordered_set<RuntimeProfile::TimeSeriesCounter*> TimeSeriesCounters;
   TimeSeriesCounters time_series_counters_;
 
-  /// Singleton object that keeps track of all rate counters and the thread
-  /// for updating them.
+  /// Singleton object that keeps track of all profile rate counters and the thread
+  /// for updating them. Interval set by flag periodic_counter_update_period_ms.
   static PeriodicCounterUpdater* instance_;
+
+  /// Singleton object that keeps track of all system rate counters and the thread
+  /// for updating them. Interval set by flag periodic_system_counter_update_period_ms.
+  static PeriodicCounterUpdater* system_instance_;
+
+  int32_t update_period_;
 };
 }
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 316c2d083..8b12662e1 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -54,6 +54,8 @@ namespace impala {
   #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
       (profile)->AddSamplingTimeSeriesCounter(name, src_counter)
+  #define ADD_SYSTEM_TIME_SERIES_COUNTER(profile, name, src_counter) \
+      (profile)->AddSamplingTimeSeriesCounter(name, src_counter, true)
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
@@ -754,6 +756,9 @@ class RuntimeProfile::TimeSeriesCounter {
 
   TUnit::type unit() const { return unit_; }
 
+  void SetIsSystem() { is_system_ = true; }
+  bool GetIsSystem() const { return  is_system_; }
+
  private:
   friend class RuntimeProfile;
 
@@ -794,7 +799,7 @@ class RuntimeProfile::TimeSeriesCounter {
  protected:
   TimeSeriesCounter(const std::string& name, TUnit::type unit,
       SampleFunction fn = SampleFunction())
-    : name_(name), unit_(unit), sample_fn_(fn) {}
+    : name_(name), unit_(unit), sample_fn_(fn), is_system_(false) {}
 
   std::string name_;
   TUnit::type unit_;
@@ -802,6 +807,7 @@ class RuntimeProfile::TimeSeriesCounter {
   /// The number of samples that have been retrieved and cleared from this counter.
   int64_t previous_sample_count_ = 0;
   mutable SpinLock lock_;
+  bool is_system_;
 };
 
 typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
@@ -811,8 +817,8 @@ class RuntimeProfile::SamplingTimeSeriesCounter
   friend class RuntimeProfile;
 
   SamplingTimeSeriesCounter(
-      const std::string& name, TUnit::type unit, SampleFunction fn)
-    : TimeSeriesCounter(name, unit, fn) {}
+      const std::string& name, TUnit::type unit, SampleFunction fn, int initial_period)
+    : TimeSeriesCounter(name, unit, fn), samples_(initial_period) {}
 
   virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
   virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override;
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index bd5f4903f..3a446e897 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -1105,7 +1105,7 @@ void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num,
 }
 
 TEST(CountersTest, StreamingSampler) {
-  StreamingSampler<int, 10> sampler;
+  StreamingSampler<int, 10> sampler(500);
 
   int idx = 0;
   for (int i = 0; i < 3; ++i) {
@@ -1623,6 +1623,7 @@ TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) {
   RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
 
   const TimeSeriesTestParam& param = GetParam();
+  FLAGS_periodic_counter_update_period_ms = 500;
   const int test_period = FLAGS_periodic_counter_update_period_ms;
 
   // Add a counter with a sample function that counts up, starting from 0.
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 9b7c2b407..0b5ec53ca 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,6 +53,7 @@
 
 DECLARE_int32(status_report_interval_ms);
 DECLARE_int32(periodic_counter_update_period_ms);
+DECLARE_int32(periodic_system_counter_update_period_ms);
 
 // This must be set on the coordinator to enable the alternative profile representation.
 // It should not be set on executors - the setting is sent to the executors by
@@ -2018,12 +2019,19 @@ RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
 }
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
-    const string& name, TUnit::type unit, SampleFunction fn) {
+    const string& name, TUnit::type unit, SampleFunction fn, bool is_system) {
   DCHECK(fn != nullptr);
   lock_guard<SpinLock> l(counter_map_lock_);
   TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
   if (it != time_series_counter_map_.end()) return it->second;
-  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
+  int32_t update_interval = is_system ?
+      FLAGS_periodic_system_counter_update_period_ms :
+      FLAGS_periodic_counter_update_period_ms;
+  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name,
+      unit, fn, update_interval));
+  if (is_system) {
+    counter->SetIsSystem();
+  }
   time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
   has_active_periodic_counters_ = true;
@@ -2031,10 +2039,10 @@ RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
 }
 
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
-    const string& name, Counter* src_counter) {
+    const string& name, Counter* src_counter, bool is_system) {
   DCHECK(src_counter != NULL);
   return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
-      bind(&Counter::value, src_counter));
+      bind(&Counter::value, src_counter), is_system);
 }
 
 void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index c6b8fece6..419b8d916 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -99,7 +99,8 @@ class RuntimeProfileBase {
 
   class Counter {
    public:
-    Counter(TUnit::type unit, int64_t value = 0) : value_(value), unit_(unit) {}
+    Counter(TUnit::type unit, int64_t value = 0)
+      : value_(value), unit_(unit), is_system_(false) {}
     virtual ~Counter(){}
 
     virtual void Add(int64_t delta) {
@@ -144,11 +145,15 @@ class RuntimeProfileBase {
 
     TUnit::type unit() const { return unit_; }
 
+    void SetIsSystem() { is_system_ = true; }
+    bool GetIsSystem() const { return  is_system_; }
+
    protected:
     friend class RuntimeProfile;
 
     AtomicInt64 value_;
     TUnit::type unit_;
+    bool is_system_;
   };
 
   class AveragedCounter;
@@ -658,11 +663,11 @@ class RuntimeProfile : public RuntimeProfileBase {
   /// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
   TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name,
-      TUnit::type unit, SampleFunction sample_fn);
+      TUnit::type unit, SampleFunction sample_fn, bool is_system = false);
 
   /// Same as above except the samples are collected from 'src_counter'.
   TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter*
-      src_counter);
+      src_counter, bool is_system = false);
 
   /// Adds a chunked time series counter to the profile. This begins sampling immediately.
   /// This counter will collect new samples periodically by calling 'sample_fn()'. Samples
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index b395ace5d..cbfd2f204 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -37,7 +37,7 @@ template<typename T, int MAX_SAMPLES>
 class StreamingSampler {
   static_assert(std::is_arithmetic<T>::value, "Numerical type required");
  public:
-  StreamingSampler(int initial_period = 500)
+  StreamingSampler(int initial_period)
     : samples_collected_(0) ,
       period_(initial_period),
       current_sample_sum_(0),
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 3808eb579..39ba7f896 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -371,7 +371,7 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   74: optional string client_identifier;
 
-  75: optional double resource_trace_ratio = 0;
+  75: optional double resource_trace_ratio = 1;
 
   // See comment in ImpalaService.thrift.
   // The default value is set to 3 as this is the default value of HDFS replicas.
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 67f5ca282..d6cd299c6 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -592,10 +592,10 @@ class TestObservability(ImpalaTestSuite):
     assert any(expected_str in line for line in profile.splitlines())
 
   def test_query_profile_host_resource_metrics_off(self):
-    """Tests that the query profile does not contain resource usage metrics by default or
+    """Tests that the query profile does not contain resource usage metrics
        when disabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
-    for query_opts in [None, {'resource_trace_ratio': 0.0}]:
+    for query_opts in [{'resource_trace_ratio': 0.0}]:
       profile = self.execute_query(query, query_opts).runtime_profile
       # Assert that no host resource counters exist in the profile
       for line in profile.splitlines():
@@ -604,23 +604,24 @@ class TestObservability(ImpalaTestSuite):
         assert not re.search("HostDiskReadThroughput", line)
 
   def test_query_profile_contains_host_resource_metrics(self):
-    """Tests that the query profile contains various CPU and network metrics."""
-    query_opts = {'resource_trace_ratio': 1.0}
+    """Tests that the query profile contains various CPU and network metrics
+       by default or when enabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
-    profile = self.execute_query(query, query_opts).runtime_profile
-    # We check for 500ms because a query with 1s duration won't hit the 64 values limit
-    # that would trigger resampling.
-    expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
-                     "HostCpuSysPercentage (500.000ms):",
-                     "HostCpuUserPercentage (500.000ms):",
-                     "HostNetworkRx (500.000ms):",
-                     "HostNetworkTx (500.000ms):",
-                     "HostDiskReadThroughput (500.000ms):",
-                     "HostDiskWriteThroughput (500.000ms):"]
-
-    # Assert that all expected counters exist in the profile.
-    for expected_str in expected_strs:
-      assert any(expected_str in line for line in profile.splitlines()), expected_str
+    for query_opts in [{}, {'resource_trace_ratio': 1.0}]:
+      profile = self.execute_query(query, query_opts).runtime_profile
+      # We check for 50ms because a query with 1s duration won't hit the 64 values limit
+      # that would trigger resampling.
+      expected_strs = ["HostCpuIoWaitPercentage (50.000ms):",
+                     "HostCpuSysPercentage (50.000ms):",
+                     "HostCpuUserPercentage (50.000ms):",
+                     "HostNetworkRx (50.000ms):",
+                     "HostNetworkTx (50.000ms):",
+                     "HostDiskReadThroughput (50.000ms):",
+                     "HostDiskWriteThroughput (50.000ms):"]
+
+      # Assert that all expected counters exist in the profile.
+      for expected_str in expected_strs:
+        assert any(expected_str in line for line in profile.splitlines()), expected_str
 
     # Check that there are some values for each counter.
     for line in profile.splitlines():