You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/06 23:00:01 UTC
(impala) 02/02: IMPALA-12385: Enable Periodic metrics by default
This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit d75807a195273cdba29e33f00b7b6f9bee012f62
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Fri Aug 18 07:44:44 2023 -0500
IMPALA-12385: Enable Periodic metrics by default
This patch enables periodic metrics in query profiles by default and
changes the metric collectors to be more suitable for mixed workloads.
-Changed default of resource_trace_ratio to 1.
-Changed profile metrics to use sampling counters which can automatically
resize for long queries.
-Reduced profile metric samping interval to 50ms to support short-running
queries.
-Changed fragment metrics to use the same sampling interval as periodic
metrics.
-Added singleton PeriodicCounterUpdater and thread for updating system
(KRPC) metrics at a different period than fragment metrics.
-Added new flag periodic_system_counter_update_period_ms for configuring
system metric update period. Default is 500ms.
Example profile output:
- HostCpuIoWaitPercentage (400.000ms): 1, 0, 2, 3, 4, 6, 5, 2, 1, ...
- HostCpuSysPercentage (400.000ms): 1, 12, 10, 11, 11, 5, 3, 3, 3, ...
- HostCpuUserPercentage (400.000ms): 8, 46, 39, 39, 39, 29, 22, 23, ...
Testing:
-Updated runtime-profile-test and test_observability.py for new defaults
-Manual inspection of query profile metrics for long-running queries
-Tested for performance regression using 50 concurrent TPCH Q1 and with
periodic_counter_update_period_ms=1 to verify that the periodic update
loop does not affect peformance or become a bottleneck
Change-Id: Ic8e5cbfd4b324081158574ceb8f4b3a062a69fd1
Reviewed-on: http://gerrit.cloudera.org:8080/20377
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Smith <mi...@cloudera.com>
---
be/src/runtime/exec-env.cc | 2 +-
be/src/runtime/krpc-data-stream-recvr.cc | 6 +-
be/src/runtime/query-state.cc | 14 ++---
be/src/util/periodic-counter-updater.cc | 103 ++++++++++++++++++++-----------
be/src/util/periodic-counter-updater.h | 22 +++++--
be/src/util/runtime-profile-counters.h | 12 +++-
be/src/util/runtime-profile-test.cc | 3 +-
be/src/util/runtime-profile.cc | 16 +++--
be/src/util/runtime-profile.h | 11 +++-
be/src/util/streaming-sampler.h | 2 +-
common/thrift/Query.thrift | 2 +-
tests/query_test/test_observability.py | 37 +++++------
12 files changed, 148 insertions(+), 82 deletions(-)
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 8f2768e49..c31bd597f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -637,7 +637,7 @@ void ExecEnv::InitSystemStateInfo() {
system_state_info_.reset(new SystemStateInfo());
PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
s->CaptureSystemStateSnapshot();
- });
+ }, true);
}
Status ExecEnv::GetKuduClient(const vector<string>& master_addresses,
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index a3b804e7f..394fe64cb 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -707,7 +707,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
// Initialize various counters for measuring dequeuing from queues.
bytes_dequeued_counter_ =
ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES);
- bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+ bytes_dequeued_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_);
queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime");
data_wait_timer_ =
@@ -719,7 +719,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
// Initialize various counters for measuring enqueuing into queues.
bytes_received_counter_ =
ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES);
- bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER(
+ bytes_received_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER(
enqueue_profile_, "BytesReceived", bytes_received_counter_);
deserialize_row_batch_timer_ =
ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime");
@@ -735,7 +735,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
deferred_rpcs_time_series_counter_ =
enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
- bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
+ bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this), true);
total_has_deferred_rpcs_timer_ =
ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
dispatch_timer_ =
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 31f833b8d..5738448e8 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -218,33 +218,33 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
// Initialize resource tracking counters.
if (query_ctx().trace_resource_usage) {
SystemStateInfo* system_state_info = exec_env->system_state_info();
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
return system_state_info->GetCpuUsageRatios().user;
});
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
return system_state_info->GetCpuUsageRatios().system;
});
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
return system_state_info->GetCpuUsageRatios().iowait;
});
// Add network usage
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
return system_state_info->GetNetworkUsage().rx_rate;
});
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
return system_state_info->GetNetworkUsage().tx_rate;
});
// Add disk stats
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostDiskReadThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
return system_state_info->GetDiskStats().read_rate;
});
- host_profile_->AddChunkedTimeSeriesCounter(
+ host_profile_->AddSamplingTimeSeriesCounter(
"HostDiskWriteThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () {
return system_state_info->GetDiskStats().write_rate;
});
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 8bef6aa9a..edb8a4d14 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -26,25 +26,43 @@ namespace posix_time = boost::posix_time;
using boost::get_system_time;
using boost::system_time;
-// Period to update rate counters and sampling counters in ms.
-DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
- " sampling counters in ms");
+// Period to update query profile rate counters and sampling counters in ms.
+DEFINE_int32(periodic_counter_update_period_ms, 50, "Period to update"
+ " query profile rate counters and sampling counters in ms");
+
+// Period to update system-level rate counters and sampling counters in ms.
+DEFINE_int32(periodic_system_counter_update_period_ms, 500, "Period to update"
+ " system-level rate counters and sampling counters in ms");
namespace impala {
+// Updater for profile counters
PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
+// Updater for system counters
+PeriodicCounterUpdater* PeriodicCounterUpdater::system_instance_ = nullptr;
+
void PeriodicCounterUpdater::Init() {
- DCHECK(instance_ == nullptr);
- // Create the singleton, which will live until the process terminates.
- instance_ = new PeriodicCounterUpdater;
+ DCHECK(instance_ == nullptr && system_instance_ == nullptr);
+ // Create two singletons, which will live until the process terminates.
+ instance_ = new PeriodicCounterUpdater(FLAGS_periodic_counter_update_period_ms);
+
instance_->update_thread_.reset(
- new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
+ new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, instance_, instance_)));
+
+ system_instance_ =
+ new PeriodicCounterUpdater(FLAGS_periodic_system_counter_update_period_ms);
+
+ system_instance_->update_thread_.reset(
+ new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, system_instance_,
+ system_instance_)));
+
}
-void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) {
- lock_guard<SpinLock> l(instance_->update_fns_lock_);
- instance_->update_fns_.push_back(update_fn);
+void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn, bool is_system) {
+ PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
+ lock_guard<SpinLock> l(instance->update_fns_lock_);
+ instance->update_fns_.push_back(update_fn);
}
void PeriodicCounterUpdater::RegisterPeriodicCounter(
@@ -52,6 +70,9 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
RuntimeProfile::SampleFunction sample_fn,
RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
DCHECK(src_counter == NULL || sample_fn == NULL);
+ DCHECK(src_counter == NULL || src_counter->GetIsSystem() == dst_counter->GetIsSystem());
+ PeriodicCounterUpdater* instance = dst_counter->GetIsSystem() ?
+ system_instance_ : instance_;
switch (type) {
case RATE_COUNTER: {
@@ -59,8 +80,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
counter.src_counter = src_counter;
counter.sample_fn = sample_fn;
counter.elapsed_ms = 0;
- lock_guard<SpinLock> ratelock(instance_->rate_lock_);
- instance_->rate_counters_[dst_counter] = counter;
+ lock_guard<SpinLock> ratelock(instance->rate_lock_);
+ instance->rate_counters_[dst_counter] = counter;
break;
}
case SAMPLING_COUNTER: {
@@ -69,8 +90,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
counter.sample_fn = sample_fn;
counter.num_sampled = 0;
counter.total_sampled_value = 0;
- lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
- instance_->sampling_counters_[dst_counter] = counter;
+ lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+ instance->sampling_counters_[dst_counter] = counter;
break;
}
default:
@@ -79,35 +100,43 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter(
}
void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) {
- lock_guard<SpinLock> ratelock(instance_->rate_lock_);
- instance_->rate_counters_.erase(counter);
+ PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+ system_instance_ : instance_;
+ lock_guard<SpinLock> ratelock(instance->rate_lock_);
+ instance->rate_counters_.erase(counter);
}
void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) {
- lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
- instance_->sampling_counters_.erase(counter);
+ PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+ system_instance_ : instance_;
+ lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
+ instance->sampling_counters_.erase(counter);
}
void PeriodicCounterUpdater::RegisterBucketingCounters(
RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) {
+ PeriodicCounterUpdater* instance = src_counter->GetIsSystem() ?
+ system_instance_ : instance_;
BucketCountersInfo info;
info.src_counter = src_counter;
info.num_sampled = 0;
- lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
- instance_->bucketing_counters_[buckets] = info;
+ lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
+ instance->bucketing_counters_[buckets] = info;
}
void PeriodicCounterUpdater::StopBucketingCounters(
- vector<RuntimeProfile::Counter*>* buckets) {
+ vector<RuntimeProfile::Counter*>* buckets, bool is_system) {
int64_t num_sampled = 0;
+ PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_;
{
- lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+ lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
BucketCountersMap::iterator itr =
- instance_->bucketing_counters_.find(buckets);
+ instance->bucketing_counters_.find(buckets);
// If not registered, we have nothing to do.
- if (itr == instance_->bucketing_counters_.end()) return;
+ if (itr == instance->bucketing_counters_.end()) return;
+ DCHECK(is_system == itr->second.src_counter->GetIsSystem());
num_sampled = itr->second.num_sampled;
- instance_->bucketing_counters_.erase(itr);
+ instance->bucketing_counters_.erase(itr);
}
if (num_sampled > 0) {
@@ -120,20 +149,24 @@ void PeriodicCounterUpdater::StopBucketingCounters(
void PeriodicCounterUpdater::RegisterTimeSeriesCounter(
RuntimeProfile::TimeSeriesCounter* counter) {
- lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
- instance_->time_series_counters_.insert(counter);
+ PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+ system_instance_ : instance_;
+ lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+ instance->time_series_counters_.insert(counter);
}
void PeriodicCounterUpdater::StopTimeSeriesCounter(
RuntimeProfile::TimeSeriesCounter* counter) {
- lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
- instance_->time_series_counters_.erase(counter);
+ PeriodicCounterUpdater* instance = counter->GetIsSystem() ?
+ system_instance_ : instance_;
+ lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
+ instance->time_series_counters_.erase(counter);
}
-void PeriodicCounterUpdater::UpdateLoop() {
+void PeriodicCounterUpdater::UpdateLoop(PeriodicCounterUpdater* instance) {
while (true) {
system_time before_time = get_system_time();
- SleepForMs(FLAGS_periodic_counter_update_period_ms);
+ SleepForMs(update_period_);
posix_time::time_duration elapsed = get_system_time() - before_time;
int elapsed_ms = elapsed.total_milliseconds();
@@ -143,7 +176,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
}
{
- lock_guard<SpinLock> ratelock(instance_->rate_lock_);
+ lock_guard<SpinLock> ratelock(instance->rate_lock_);
for (RateCounterMap::iterator it = rate_counters_.begin();
it != rate_counters_.end(); ++it) {
it->second.elapsed_ms += elapsed_ms;
@@ -160,7 +193,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
}
{
- lock_guard<SpinLock> samplinglock(instance_->sampling_lock_);
+ lock_guard<SpinLock> samplinglock(instance->sampling_lock_);
for (SamplingCounterMap::iterator it = sampling_counters_.begin();
it != sampling_counters_.end(); ++it) {
++it->second.num_sampled;
@@ -179,7 +212,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
}
{
- lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_);
+ lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_);
for (BucketCountersMap::iterator it = bucketing_counters_.begin();
it != bucketing_counters_.end(); ++it) {
int64_t val = it->second.src_counter->value();
@@ -190,7 +223,7 @@ void PeriodicCounterUpdater::UpdateLoop() {
}
{
- lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_);
+ lock_guard<SpinLock> timeserieslock(instance->time_series_lock_);
for (TimeSeriesCounters::iterator it = time_series_counters_.begin();
it != time_series_counters_.end(); ++it) {
(*it)->AddSample(elapsed_ms);
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 0f92070e9..6b3ef0eaf 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -39,6 +39,11 @@ namespace impala {
/// future stale samples from polluting the useful values.
class PeriodicCounterUpdater {
public:
+
+ PeriodicCounterUpdater(const int32_t update_period)
+ : update_period_(update_period) {
+ }
+
enum PeriodicCounterType {
RATE_COUNTER = 0,
SAMPLING_COUNTER,
@@ -52,7 +57,7 @@ class PeriodicCounterUpdater {
/// Registers an update function that will be called before individual counters will be
/// updated. This can be used to update some global metric once before reading it
/// through individual counters.
- static void RegisterUpdateFunction(UpdateFn update_fn);
+ static void RegisterUpdateFunction(UpdateFn update_fn, bool is_system);
/// Registers a periodic counter to be updated by the update thread.
/// Either sample_fn or dst_counter must be non-NULL. When the periodic counter
@@ -80,7 +85,8 @@ class PeriodicCounterUpdater {
/// convert the buckets from count to percentage. If not registered, has no effect.
/// Perioidic counters are updated periodically so should be removed as soon as the
/// underlying counter is no longer going to change.
- static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets);
+ static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets,
+ bool is_system = false);
/// Stops 'counter' from receiving any more samples.
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);
@@ -107,7 +113,7 @@ class PeriodicCounterUpdater {
/// Loop for periodic counter update thread. This thread wakes up once in a while
/// and updates all the added rate counters and sampling counters.
- [[noreturn]] void UpdateLoop();
+ [[noreturn]] void UpdateLoop(PeriodicCounterUpdater* instance);
/// Thread performing asynchronous updates.
boost::scoped_ptr<boost::thread> update_thread_;
@@ -149,8 +155,14 @@ class PeriodicCounterUpdater {
typedef boost::unordered_set<RuntimeProfile::TimeSeriesCounter*> TimeSeriesCounters;
TimeSeriesCounters time_series_counters_;
- /// Singleton object that keeps track of all rate counters and the thread
- /// for updating them.
+ /// Singleton object that keeps track of all profile rate counters and the thread
+ /// for updating them. Interval set by flag periodic_counter_update_period_ms.
static PeriodicCounterUpdater* instance_;
+
+ /// Singleton object that keeps track of all system rate counters and the thread
+ /// for updating them. Interval set by flag periodic_system_counter_update_period_ms.
+ static PeriodicCounterUpdater* system_instance_;
+
+ int32_t update_period_;
};
}
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 316c2d083..8b12662e1 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -54,6 +54,8 @@ namespace impala {
#define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
#define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
(profile)->AddSamplingTimeSeriesCounter(name, src_counter)
+ #define ADD_SYSTEM_TIME_SERIES_COUNTER(profile, name, src_counter) \
+ (profile)->AddSamplingTimeSeriesCounter(name, src_counter, true)
#define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
#define ADD_SUMMARY_STATS_TIMER(profile, name) \
(profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
@@ -754,6 +756,9 @@ class RuntimeProfile::TimeSeriesCounter {
TUnit::type unit() const { return unit_; }
+ void SetIsSystem() { is_system_ = true; }
+ bool GetIsSystem() const { return is_system_; }
+
private:
friend class RuntimeProfile;
@@ -794,7 +799,7 @@ class RuntimeProfile::TimeSeriesCounter {
protected:
TimeSeriesCounter(const std::string& name, TUnit::type unit,
SampleFunction fn = SampleFunction())
- : name_(name), unit_(unit), sample_fn_(fn) {}
+ : name_(name), unit_(unit), sample_fn_(fn), is_system_(false) {}
std::string name_;
TUnit::type unit_;
@@ -802,6 +807,7 @@ class RuntimeProfile::TimeSeriesCounter {
/// The number of samples that have been retrieved and cleared from this counter.
int64_t previous_sample_count_ = 0;
mutable SpinLock lock_;
+ bool is_system_;
};
typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
@@ -811,8 +817,8 @@ class RuntimeProfile::SamplingTimeSeriesCounter
friend class RuntimeProfile;
SamplingTimeSeriesCounter(
- const std::string& name, TUnit::type unit, SampleFunction fn)
- : TimeSeriesCounter(name, unit, fn) {}
+ const std::string& name, TUnit::type unit, SampleFunction fn, int initial_period)
+ : TimeSeriesCounter(name, unit, fn), samples_(initial_period) {}
virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override;
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index bd5f4903f..3a446e897 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -1105,7 +1105,7 @@ void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num,
}
TEST(CountersTest, StreamingSampler) {
- StreamingSampler<int, 10> sampler;
+ StreamingSampler<int, 10> sampler(500);
int idx = 0;
for (int i = 0; i < 3; ++i) {
@@ -1623,6 +1623,7 @@ TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) {
RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
const TimeSeriesTestParam& param = GetParam();
+ FLAGS_periodic_counter_update_period_ms = 500;
const int test_period = FLAGS_periodic_counter_update_period_ms;
// Add a counter with a sample function that counts up, starting from 0.
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 9b7c2b407..0b5ec53ca 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -53,6 +53,7 @@
DECLARE_int32(status_report_interval_ms);
DECLARE_int32(periodic_counter_update_period_ms);
+DECLARE_int32(periodic_system_counter_update_period_ms);
// This must be set on the coordinator to enable the alternative profile representation.
// It should not be set on executors - the setting is sent to the executors by
@@ -2018,12 +2019,19 @@ RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
- const string& name, TUnit::type unit, SampleFunction fn) {
+ const string& name, TUnit::type unit, SampleFunction fn, bool is_system) {
DCHECK(fn != nullptr);
lock_guard<SpinLock> l(counter_map_lock_);
TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
if (it != time_series_counter_map_.end()) return it->second;
- TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
+ int32_t update_interval = is_system ?
+ FLAGS_periodic_system_counter_update_period_ms :
+ FLAGS_periodic_counter_update_period_ms;
+ TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name,
+ unit, fn, update_interval));
+ if (is_system) {
+ counter->SetIsSystem();
+ }
time_series_counter_map_[name] = counter;
PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
has_active_periodic_counters_ = true;
@@ -2031,10 +2039,10 @@ RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
}
RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
- const string& name, Counter* src_counter) {
+ const string& name, Counter* src_counter, bool is_system) {
DCHECK(src_counter != NULL);
return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
- bind(&Counter::value, src_counter));
+ bind(&Counter::value, src_counter), is_system);
}
void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index c6b8fece6..419b8d916 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -99,7 +99,8 @@ class RuntimeProfileBase {
class Counter {
public:
- Counter(TUnit::type unit, int64_t value = 0) : value_(value), unit_(unit) {}
+ Counter(TUnit::type unit, int64_t value = 0)
+ : value_(value), unit_(unit), is_system_(false) {}
virtual ~Counter(){}
virtual void Add(int64_t delta) {
@@ -144,11 +145,15 @@ class RuntimeProfileBase {
TUnit::type unit() const { return unit_; }
+ void SetIsSystem() { is_system_ = true; }
+ bool GetIsSystem() const { return is_system_; }
+
protected:
friend class RuntimeProfile;
AtomicInt64 value_;
TUnit::type unit_;
+ bool is_system_;
};
class AveragedCounter;
@@ -658,11 +663,11 @@ class RuntimeProfile : public RuntimeProfileBase {
/// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
/// Note: these counters don't get merged (to make average profiles)
TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name,
- TUnit::type unit, SampleFunction sample_fn);
+ TUnit::type unit, SampleFunction sample_fn, bool is_system = false);
/// Same as above except the samples are collected from 'src_counter'.
TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter*
- src_counter);
+ src_counter, bool is_system = false);
/// Adds a chunked time series counter to the profile. This begins sampling immediately.
/// This counter will collect new samples periodically by calling 'sample_fn()'. Samples
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index b395ace5d..cbfd2f204 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -37,7 +37,7 @@ template<typename T, int MAX_SAMPLES>
class StreamingSampler {
static_assert(std::is_arithmetic<T>::value, "Numerical type required");
public:
- StreamingSampler(int initial_period = 500)
+ StreamingSampler(int initial_period)
: samples_collected_(0) ,
period_(initial_period),
current_sample_sum_(0),
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 3808eb579..39ba7f896 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -371,7 +371,7 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
74: optional string client_identifier;
- 75: optional double resource_trace_ratio = 0;
+ 75: optional double resource_trace_ratio = 1;
// See comment in ImpalaService.thrift.
// The default value is set to 3 as this is the default value of HDFS replicas.
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 67f5ca282..d6cd299c6 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -592,10 +592,10 @@ class TestObservability(ImpalaTestSuite):
assert any(expected_str in line for line in profile.splitlines())
def test_query_profile_host_resource_metrics_off(self):
- """Tests that the query profile does not contain resource usage metrics by default or
+ """Tests that the query profile does not contain resource usage metrics
when disabled explicitly."""
query = "select count(*), sleep(1000) from functional.alltypes"
- for query_opts in [None, {'resource_trace_ratio': 0.0}]:
+ for query_opts in [{'resource_trace_ratio': 0.0}]:
profile = self.execute_query(query, query_opts).runtime_profile
# Assert that no host resource counters exist in the profile
for line in profile.splitlines():
@@ -604,23 +604,24 @@ class TestObservability(ImpalaTestSuite):
assert not re.search("HostDiskReadThroughput", line)
def test_query_profile_contains_host_resource_metrics(self):
- """Tests that the query profile contains various CPU and network metrics."""
- query_opts = {'resource_trace_ratio': 1.0}
+ """Tests that the query profile contains various CPU and network metrics
+ by default or when enabled explicitly."""
query = "select count(*), sleep(1000) from functional.alltypes"
- profile = self.execute_query(query, query_opts).runtime_profile
- # We check for 500ms because a query with 1s duration won't hit the 64 values limit
- # that would trigger resampling.
- expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
- "HostCpuSysPercentage (500.000ms):",
- "HostCpuUserPercentage (500.000ms):",
- "HostNetworkRx (500.000ms):",
- "HostNetworkTx (500.000ms):",
- "HostDiskReadThroughput (500.000ms):",
- "HostDiskWriteThroughput (500.000ms):"]
-
- # Assert that all expected counters exist in the profile.
- for expected_str in expected_strs:
- assert any(expected_str in line for line in profile.splitlines()), expected_str
+ for query_opts in [{}, {'resource_trace_ratio': 1.0}]:
+ profile = self.execute_query(query, query_opts).runtime_profile
+ # We check for 50ms because a query with 1s duration won't hit the 64 values limit
+ # that would trigger resampling.
+ expected_strs = ["HostCpuIoWaitPercentage (50.000ms):",
+ "HostCpuSysPercentage (50.000ms):",
+ "HostCpuUserPercentage (50.000ms):",
+ "HostNetworkRx (50.000ms):",
+ "HostNetworkTx (50.000ms):",
+ "HostDiskReadThroughput (50.000ms):",
+ "HostDiskWriteThroughput (50.000ms):"]
+
+ # Assert that all expected counters exist in the profile.
+ for expected_str in expected_strs:
+ assert any(expected_str in line for line in profile.splitlines()), expected_str
# Check that there are some values for each counter.
for line in profile.splitlines():