You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/01/18 23:55:54 UTC
[3/3] impala git commit: IMPALA-2397: Use atomics for IntGauge and
IntCounter
IMPALA-2397: Use atomics for IntGauge and IntCounter
This change removes the spinlock in IntGauge and IntCounter
and uses AtomicInt64 instead. As shown in IMPALA-2397, multiple
threads can be contending for the spinlocks of some global metrics
under concurrent queries.
This change also breaks up SimpleMetric is renamed to ScalarMetric
and broken into two subclasses:
- LockedMetric:
- a value store for any primitive type (int,float,string etc).
- atomic read and write via GetValue() and SetValue() respectively.
- AtomicMetric:
- the basis of IntGauge and IntCounter. Support atomic increment
of the metric value via Increment() interface.
- atomic read and write via GetValue() and SetValue() respectively.
- only support int64_t type.
Change-Id: I48dfa5443cd771916b53541a0ffeaf1bcc7e7606
Reviewed-on: http://gerrit.cloudera.org:8080/9012
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e714f2b3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e714f2b3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e714f2b3
Branch: refs/heads/master
Commit: e714f2b33c5b64d5680dbc15e166759930f04560
Parents: b3d38b5
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jan 10 19:28:09 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 18 23:31:52 2018 +0000
----------------------------------------------------------------------
be/src/exec/external-data-source-executor.cc | 8 +-
be/src/rpc/TAcceptQueueServer.cpp | 2 +-
be/src/rpc/thrift-server.cc | 4 +-
be/src/runtime/client-cache.cc | 6 +-
be/src/runtime/data-stream-mgr.cc | 6 +-
be/src/runtime/exec-env.cc | 10 +-
be/src/runtime/io/scan-range.cc | 4 +-
be/src/runtime/krpc-data-stream-mgr.cc | 6 +-
be/src/runtime/mem-tracker-test.cc | 4 +-
be/src/runtime/mem-tracker.cc | 10 +-
be/src/runtime/mem-tracker.h | 4 +-
be/src/runtime/query-exec-mgr.cc | 2 +-
be/src/runtime/query-state.cc | 6 +-
be/src/runtime/tmp-file-mgr-test.cc | 2 +-
be/src/runtime/tmp-file-mgr.cc | 4 +-
be/src/scheduling/admission-controller.cc | 58 +++---
be/src/scheduling/scheduler.cc | 9 +-
be/src/service/impala-server.cc | 10 +-
be/src/service/session-expiry-test.cc | 12 +-
be/src/statestore/statestore-subscriber.cc | 14 +-
be/src/statestore/statestore.cc | 11 +-
be/src/util/common-metrics.cc | 2 +-
be/src/util/default-path-handlers.cc | 2 +-
be/src/util/impalad-metrics.cc | 66 +++----
be/src/util/memory-metrics.cc | 99 +++++-----
be/src/util/memory-metrics.h | 42 ++--
be/src/util/metrics-test.cc | 44 ++---
be/src/util/metrics.h | 222 ++++++++++++----------
be/src/util/thread.cc | 8 +-
common/thrift/metrics.json | 2 +-
30 files changed, 350 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c810c6..7c54f39 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -76,9 +76,9 @@ class ExternalDataSourceExecutor::JniState {
"getNumClassCacheMisses", "()J");
RETURN_ERROR_IF_EXC(env);
- num_class_cache_hits_ = metrics->AddCounter<int64_t>(
+ num_class_cache_hits_ = metrics->AddCounter(
"external-data-source.class-cache.hits", 0);
- num_class_cache_misses_ = metrics->AddCounter<int64_t>(
+ num_class_cache_misses_ = metrics->AddCounter(
"external-data-source.class-cache.misses", 0);
return Status::OK();
}
@@ -92,11 +92,11 @@ class ExternalDataSourceExecutor::JniState {
int64_t num_cache_hits = env->CallStaticLongMethod(executor_class_,
get_num_cache_hits_id_);
RETURN_ERROR_IF_EXC(env);
- num_class_cache_hits_->set_value(num_cache_hits);
+ num_class_cache_hits_->SetValue(num_cache_hits);
int64_t num_cache_misses = env->CallStaticLongMethod(executor_class_,
get_num_cache_misses_id_);
RETURN_ERROR_IF_EXC(env);
- num_class_cache_misses_->set_value(num_cache_misses);
+ num_class_cache_misses_->SetValue(num_cache_misses);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 8a398a2..5c1b1da 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -286,7 +286,7 @@ void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_pre
DCHECK(metrics != NULL);
stringstream queue_size_ss;
queue_size_ss << key_prefix << ".connection-setup-queue-size";
- queue_size_metric_ = metrics->AddGauge<int64_t>(queue_size_ss.str(), 0);
+ queue_size_metric_ = metrics->AddGauge(queue_size_ss.str(), 0);
metrics_enabled_ = true;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ab51315..eaca699 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -342,10 +342,10 @@ ThriftServer::ThriftServer(const string& name,
metrics_enabled_ = true;
stringstream count_ss;
count_ss << "impala.thrift-server." << name << ".connections-in-use";
- num_current_connections_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+ num_current_connections_metric_ = metrics->AddGauge(count_ss.str(), 0);
stringstream max_ss;
max_ss << "impala.thrift-server." << name << ".total-connections";
- total_connections_metric_ = metrics->AddCounter<int64_t>(max_ss.str(), 0);
+ total_connections_metric_ = metrics->AddCounter(max_ss.str(), 0);
metrics_ = metrics;
} else {
metrics_enabled_ = false;
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 8c0b6aa..af530f7 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -94,7 +94,7 @@ Status ClientCacheHelper::ReopenClient(ClientFactory factory_method,
// CreateClient() will increment total_clients_metric_ if succeed.
if (metrics_enabled_) {
total_clients_metric_->Increment(-1);
- DCHECK_GE(total_clients_metric_->value(), 0);
+ DCHECK_GE(total_clients_metric_->GetValue(), 0);
}
lock_guard<mutex> lock(client_map_lock_);
client_map_.erase(client);
@@ -235,11 +235,11 @@ void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_pref
lock_guard<mutex> lock(cache_lock_);
stringstream count_ss;
count_ss << key_prefix << ".client-cache.clients-in-use";
- clients_in_use_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+ clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
stringstream max_ss;
max_ss << key_prefix << ".client-cache.total-clients";
- total_clients_metric_ = metrics->AddGauge<int64_t>(max_ss.str(), 0);
+ total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
metrics_enabled_ = true;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 93c524e..45eee7f 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -57,10 +57,10 @@ namespace impala {
DataStreamMgr::DataStreamMgr(MetricGroup* metrics) {
metrics_ = metrics->GetOrCreateChildGroup("datastream-manager");
num_senders_waiting_ =
- metrics_->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+ metrics_->AddGauge("senders-blocked-on-recvr-creation", 0L);
total_senders_waited_ =
- metrics_->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
- num_senders_timedout_ = metrics_->AddCounter<int64_t>(
+ metrics_->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+ num_senders_timedout_ = metrics_->AddCounter(
"total-senders-timedout-waiting-for-recvr-creation", 0L);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 6d9a857..f191921 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -333,9 +333,9 @@ Status ExecEnv::Init() {
// Also need a MemTracker for unused reservations as a negative value. Unused
// reservations are counted against queries but not against the process memory
// consumption. This accounts for that difference.
- IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge<int64_t>(
- MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
- BufferPoolMetric::UNUSED_RESERVATION_BYTES));
+ IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge(
+ MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
+ BufferPoolMetric::UNUSED_RESERVATION_BYTES));
obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
"Buffer Pool: Unused Reservation", mem_tracker_.get()));
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -350,13 +350,13 @@ Status ExecEnv::Init() {
// reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
// (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
// used by TCMalloc.
- IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge<int64_t>(
+ IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
TcmallocMetric::BYTES_IN_USE));
vector<IntGauge*> overhead_metrics;
overhead_metrics.push_back(negated_bytes_in_use);
overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
- SumGauge<int64_t>* tcmalloc_overhead = obj_pool_->Add(new SumGauge<int64_t>(
+ SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
overhead_metrics));
obj_pool_->Add(
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index dc14050..21daa96 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -335,8 +335,8 @@ void ScanRange::Close() {
struct hdfsHedgedReadMetrics* hedged_metrics;
int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
if (success == 0) {
- ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
- ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
+ ImpaladMetrics::HEDGED_READ_OPS->SetValue(hedged_metrics->hedgedReadOps);
+ ImpaladMetrics::HEDGED_READ_OPS_WIN->SetValue(hedged_metrics->hedgedReadOpsWin);
hdfsFreeHedgedReadMetrics(hedged_metrics);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 348b9ab..86955c8 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -63,10 +63,10 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
MetricGroup* dsm_metrics = metrics->GetOrCreateChildGroup("datastream-manager");
num_senders_waiting_ =
- dsm_metrics->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+ dsm_metrics->AddGauge("senders-blocked-on-recvr-creation", 0L);
total_senders_waited_ =
- dsm_metrics->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
- num_senders_timedout_ = dsm_metrics->AddCounter<int64_t>(
+ dsm_metrics->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+ num_senders_timedout_ = dsm_metrics->AddCounter(
"total-senders-timedout-waiting-for-recvr-creation", 0L);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 4aaac05..faeb6a9 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -62,13 +62,13 @@ TEST(MemTestTest, ConsumptionMetric) {
md.__set_units(TUnit::BYTES);
md.__set_kind(TMetricKind::GAUGE);
IntGauge metric(md, 0);
- EXPECT_EQ(metric.value(), 0);
+ EXPECT_EQ(metric.GetValue(), 0);
TMetricDef neg_md;
neg_md.__set_key("neg_test");
neg_md.__set_units(TUnit::BYTES);
neg_md.__set_kind(TMetricKind::GAUGE);
- NegatedGauge<int64_t> neg_metric(neg_md, &metric);
+ NegatedGauge neg_metric(neg_md, &metric);
MemTracker t(&metric, 100, "");
MemTracker neg_t(&neg_metric, 100, "");
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 98f45db..e5aa290 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -211,16 +211,16 @@ MemTracker::~MemTracker() {
}
void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
- num_gcs_metric_ = metrics->AddCounter<int64_t>(Substitute("$0.num-gcs", prefix), 0);
+ num_gcs_metric_ = metrics->AddCounter(Substitute("$0.num-gcs", prefix), 0);
// TODO: Consider a total amount of bytes freed counter
- bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
+ bytes_freed_by_last_gc_metric_ = metrics->AddGauge(
Substitute("$0.bytes-freed-by-last-gc", prefix), -1);
- bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
+ bytes_over_limit_metric_ = metrics->AddGauge(
Substitute("$0.bytes-over-limit", prefix), -1);
- limit_metric_ = metrics->AddGauge<int64_t>(Substitute("$0.limit", prefix), limit_);
+ limit_metric_ = metrics->AddGauge(Substitute("$0.limit", prefix), limit_);
}
// Calling this on the query tracker results in output like:
@@ -430,7 +430,7 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
}
if (bytes_freed_by_last_gc_metric_ != NULL) {
- bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - curr_consumption);
+ bytes_freed_by_last_gc_metric_->SetValue(pre_gc_consumption - curr_consumption);
}
return curr_consumption > max_consumption;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index fb1cd90..c582d72 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -250,7 +250,7 @@ class MemTracker {
bool LimitExceeded() {
if (UNLIKELY(CheckLimitExceeded())) {
if (bytes_over_limit_metric_ != NULL) {
- bytes_over_limit_metric_->set_value(consumption() - limit_);
+ bytes_over_limit_metric_->SetValue(consumption() - limit_);
}
return GcMemory(limit_);
}
@@ -274,7 +274,7 @@ class MemTracker {
/// call if this tracker has a consumption metric.
void RefreshConsumptionFromMetric() {
DCHECK(consumption_metric_ != nullptr);
- consumption_->Set(consumption_metric_->value());
+ consumption_->Set(consumption_metric_->GetValue());
}
int64_t limit() const { return limit_; }
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 4f30f4e..316b712 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -123,7 +123,7 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// tcmalloc and address or thread sanitizer cannot be used together
if (FLAGS_log_mem_usage_interval > 0) {
- uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
+ uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
if (num_complete % FLAGS_log_mem_usage_interval == 0) {
char buf[2048];
// This outputs how much memory is currently being used by this impalad
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 10c8033..259cd34 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -381,11 +381,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
<< " fragment_idx=" << fis->instance_ctx().fragment_idx
<< " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
<< " coord_state_idx=" << rpc_params().coord_state_idx
- << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value();
+ << " #in-flight="
+ << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
Status status = fis->Exec();
ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
- << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value()
+ << " #in-flight="
+ << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue()
<< " status=" << status;
// initiate cancellation if nobody has done so yet
if (!status.ok()) Cancel();
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index fbc0a36..3091c58 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -78,7 +78,7 @@ class TmpFileMgrTest : public ::testing::Test {
vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
IntGauge* active_metric =
metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
- EXPECT_EQ(active.size(), active_metric->value());
+ EXPECT_EQ(active.size(), active_metric->GetValue());
SetMetric<string>* active_set_metric =
metrics_->FindMetricForTesting<SetMetric<string>>(
"tmp-file-mgr.active-scratch-dirs.list");
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 650af0b..d35d302 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -132,10 +132,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
DCHECK(metrics != nullptr);
num_active_scratch_dirs_metric_ =
- metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
+ metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
- num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
+ num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
for (int i = 0; i < tmp_dirs_.size(); ++i) {
active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 99f659a..f43af2c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -482,9 +482,9 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
}
void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool_cfg) {
- metrics_.pool_max_mem_resources->set_value(pool_cfg.max_mem_resources);
- metrics_.pool_max_requests->set_value(pool_cfg.max_requests);
- metrics_.pool_max_queued->set_value(pool_cfg.max_queued);
+ metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
+ metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
+ metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
}
Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
@@ -734,18 +734,18 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
if (agg_num_running_ == num_running && agg_num_queued_ == num_queued &&
agg_mem_reserved_ == mem_reserved) {
- DCHECK_EQ(num_running, metrics_.agg_num_running->value());
- DCHECK_EQ(num_queued, metrics_.agg_num_queued->value());
- DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->value());
+ DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue());
+ DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue());
+ DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue());
return;
}
VLOG_ROW << "Recomputed agg stats, previous: " << DebugString();
agg_num_running_ = num_running;
agg_num_queued_ = num_queued;
agg_mem_reserved_ = mem_reserved;
- metrics_.agg_num_running->set_value(num_running);
- metrics_.agg_num_queued->set_value(num_queued);
- metrics_.agg_mem_reserved->set_value(mem_reserved);
+ metrics_.agg_num_running->SetValue(num_running);
+ metrics_.agg_num_queued->SetValue(num_queued);
+ metrics_.agg_mem_reserved->SetValue(mem_reserved);
VLOG_ROW << "Updated: " << DebugString();
}
@@ -782,12 +782,12 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
if (current_reserved != local_stats_.backend_mem_reserved) {
parent_->pools_for_updates_.insert(name_);
local_stats_.backend_mem_reserved = current_reserved;
- metrics_.local_backend_mem_reserved->set_value(current_reserved);
+ metrics_.local_backend_mem_reserved->SetValue(current_reserved);
}
const int64_t current_usage =
tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
- metrics_.local_backend_mem_usage->set_value(current_usage);
+ metrics_.local_backend_mem_usage->SetValue(current_usage);
}
void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
@@ -906,44 +906,44 @@ AdmissionController::GetPoolStats(const string& pool_name) {
}
void AdmissionController::PoolStats::InitMetrics() {
- metrics_.total_admitted = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_admitted = parent_->metrics_group_->AddCounter(
TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.total_queued = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_queued = parent_->metrics_group_->AddCounter(
TOTAL_QUEUED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.total_dequeued = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_dequeued = parent_->metrics_group_->AddCounter(
TOTAL_DEQUEUED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.total_rejected = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_rejected = parent_->metrics_group_->AddCounter(
TOTAL_REJECTED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.total_timed_out = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_timed_out = parent_->metrics_group_->AddCounter(
TOTAL_TIMED_OUT_METRIC_KEY_FORMAT, 0, name_);
- metrics_.total_released = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.total_released = parent_->metrics_group_->AddCounter(
TOTAL_RELEASED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter<int64_t>(
+ metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter(
TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0, name_);
- metrics_.agg_num_running = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.agg_num_running = parent_->metrics_group_->AddGauge(
AGG_NUM_RUNNING_METRIC_KEY_FORMAT, 0, name_);
- metrics_.agg_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.agg_num_queued = parent_->metrics_group_->AddGauge(
AGG_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge(
AGG_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge(
LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge(
LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT, 0, name_);
- metrics_.local_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.local_num_queued = parent_->metrics_group_->AddGauge(
LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge(
LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT, 0, name_);
- metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge(
LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
- metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge(
POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT, 0, name_);
- metrics_.pool_max_requests = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.pool_max_requests = parent_->metrics_group_->AddGauge(
POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
- metrics_.pool_max_queued = parent_->metrics_group_->AddGauge<int64_t>(
+ metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..e924f50 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -97,11 +97,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
// This is after registering with the statestored, so we already have to synchronize
// access to the executors_config_ shared_ptr.
int num_backends = GetExecutorsConfig()->NumBackends();
- total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
- total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
+ total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0);
+ total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0);
initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
- num_fragment_instances_metric_ =
- metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends);
+ num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
}
if (statestore_subscriber_ != nullptr) {
@@ -197,7 +196,7 @@ void Scheduler::UpdateMembership(
if (metrics_ != nullptr) {
/// TODO-MT: fix this (do we even need to report it?)
- num_fragment_instances_metric_->set_value(current_executors_.size());
+ num_fragment_instances_metric_->SetValue(current_executors_.size());
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6358145..a62130c 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1059,8 +1059,8 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
Status ImpalaServer::UpdateCatalogMetrics() {
TGetDbsResult dbs;
RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
- ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size());
- ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
+ ImpaladMetrics::CATALOG_NUM_DBS->SetValue(dbs.dbs.size());
+ ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(0L);
for (const TDatabase& db: dbs.dbs) {
TGetTablesResult table_names;
RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
@@ -1433,7 +1433,7 @@ void ImpalaServer::CatalogUpdateCallback(
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
update.__set_from_version(0L);
- ImpaladMetrics::CATALOG_READY->set_value(false);
+ ImpaladMetrics::CATALOG_READY->SetValue(false);
// Dropped all cached lib files (this behaves as if all functions and data
// sources are dropped).
LibCache::instance()->DropCache();
@@ -1447,7 +1447,7 @@ void ImpalaServer::CatalogUpdateCallback(
LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
<< " new min catalog object version: " << resp.min_catalog_object_version;
}
- ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
+ ImpaladMetrics::CATALOG_READY->SetValue(new_catalog_version > 0);
// TODO: deal with an error status
discard_result(UpdateCatalogMetrics());
// Remove all dropped objects from the library cache.
@@ -2130,7 +2130,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
}
services_started_ = true;
- ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
+ ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
LOG(INFO) << "Impala has started.";
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index fa69476..a211701 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -58,8 +58,8 @@ TEST(SessionTest, TestExpiry) {
IntGauge* hs2_session_metric =
impala->metrics()->FindMetricForTesting<IntGauge>(
ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS);
- EXPECT_EQ(expired_metric->value(), 0L);
- EXPECT_EQ(beeswax_session_metric->value(), 0L);
+ EXPECT_EQ(expired_metric->GetValue(), 0L);
+ EXPECT_EQ(beeswax_session_metric->GetValue(), 0L);
{
scoped_ptr<ThriftClient<ImpalaServiceClient>> beeswax_clients[NUM_SESSIONS];
@@ -80,16 +80,16 @@ TEST(SessionTest, TestExpiry) {
}
int64_t start = UnixMillis();
- while (expired_metric->value() != NUM_SESSIONS * 2 &&
+ while (expired_metric->GetValue() != NUM_SESSIONS * 2 &&
UnixMillis() - start < MAX_IDLE_TIMEOUT_MS) {
SleepForMs(100);
}
- ASSERT_EQ(expired_metric->value(), NUM_SESSIONS * 2)
+ ASSERT_EQ(expired_metric->GetValue(), NUM_SESSIONS * 2)
<< "Sessions did not expire within "<< MAX_IDLE_TIMEOUT_MS / 1000 <<" secs";
- ASSERT_EQ(beeswax_session_metric->value(), NUM_SESSIONS)
+ ASSERT_EQ(beeswax_session_metric->GetValue(), NUM_SESSIONS)
<< "Beeswax sessions unexpectedly closed after expiration";
- ASSERT_EQ(hs2_session_metric->value(), NUM_SESSIONS)
+ ASSERT_EQ(hs2_session_metric->GetValue(), NUM_SESSIONS)
<< "HiveServer2 sessions unexpectedly closed after expiration";
TPingImpalaServiceResp resp;
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 678236e..99da183 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -113,7 +113,7 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
connected_to_statestore_metric_ =
metrics_->AddProperty("statestore-subscriber.connected", false);
- last_recovery_duration_metric_ = metrics_->AddGauge(
+ last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
"statestore-subscriber.last-recovery-duration", 0.0);
last_recovery_time_metric_ = metrics_->AddProperty<string>(
"statestore-subscriber.last-recovery-time", "N/A");
@@ -164,12 +164,12 @@ Status StatestoreSubscriber::Register() {
RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
request, &response));
Status status = Status(response.status);
- if (status.ok()) connected_to_statestore_metric_->set_value(true);
+ if (status.ok()) connected_to_statestore_metric_->SetValue(true);
if (response.__isset.registration_id) {
lock_guard<mutex> l(registration_id_lock_);
registration_id_ = response.registration_id;
const string& registration_string = PrintId(registration_id_);
- registration_id_metric_->set_value(registration_string);
+ registration_id_metric_->SetValue(registration_string);
VLOG(1) << "Subscriber registration ID: " << registration_string;
} else {
VLOG(1) << "No subscriber registration ID received from statestore";
@@ -243,7 +243,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
lock_guard<mutex> l(lock_);
MonotonicStopWatch recovery_timer;
recovery_timer.Start();
- connected_to_statestore_metric_->set_value(false);
+ connected_to_statestore_metric_->SetValue(false);
LOG(INFO) << subscriber_id_
<< ": Connection with statestore lost, entering recovery mode";
uint32_t attempt_count = 1;
@@ -265,7 +265,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
<< status.GetDetail();
SleepForMs(SLEEP_INTERVAL_MS);
}
- last_recovery_duration_metric_->set_value(
+ last_recovery_duration_metric_->SetValue(
recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
}
// When we're successful in re-registering, we don't do anything
@@ -273,9 +273,9 @@ void StatestoreSubscriber::RecoveryModeChecker() {
// responsibility of individual clients to post missing updates
// back to the statestore. This saves a lot of complexity where
// we would otherwise have to cache updates here.
- last_recovery_duration_metric_->set_value(
+ last_recovery_duration_metric_->SetValue(
recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
- last_recovery_time_metric_->set_value(CurrentTimeString());
+ last_recovery_time_metric_->SetValue(CurrentTimeString());
}
SleepForMs(SLEEP_INTERVAL_MS);
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d0a4851..b135e38 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -236,13 +236,12 @@ Statestore::Statestore(MetricGroup* metrics)
FLAGS_statestore_max_missed_heartbeats / 2)) {
DCHECK(metrics != NULL);
- num_subscribers_metric_ =
- metrics->AddGauge<int64_t>(STATESTORE_LIVE_SUBSCRIBERS, 0);
+ num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
- key_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
- value_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
- topic_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
+ key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
+ value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
+ topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
topic_update_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
@@ -398,7 +397,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
subscribers_.insert(make_pair(subscriber_id, current_registration));
failure_detector_->UpdateHeartbeat(
PrintId(current_registration->registration_id()), true);
- num_subscribers_metric_->set_value(subscribers_.size());
+ num_subscribers_metric_->SetValue(subscribers_.size());
subscriber_set_metric_->Add(subscriber_id);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/common-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/common-metrics.cc b/be/src/util/common-metrics.cc
index d147862..114e0e0 100644
--- a/be/src/util/common-metrics.cc
+++ b/be/src/util/common-metrics.cc
@@ -33,7 +33,7 @@ void CommonMetrics::InitCommonMetrics(MetricGroup* metric_group) {
KUDU_CLIENT_VERSION = metric_group->AddProperty<string>(
KUDU_CLIENT_VERSION_METRIC_NAME, kudu::client::GetShortVersionString());
- PROCESS_START_TIME->set_value(CurrentTimeString());
+ PROCESS_START_TIME->SetValue(CurrentTimeString());
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 88d23f1..10966b4 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -211,7 +211,7 @@ void RootHandler(const Webserver::ArgumentMap& args, Document* document) {
document->GetAllocator());
if (CommonMetrics::PROCESS_START_TIME != nullptr) {
- Value process_start_time(CommonMetrics::PROCESS_START_TIME->value().c_str(),
+ Value process_start_time(CommonMetrics::PROCESS_START_TIME->GetValue().c_str(),
document->GetAllocator());
document->AddMember("process_start_time", process_start_time,
document->GetAllocator());
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 1325f2e..18e96a8 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -162,70 +162,70 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
IMPALA_SERVER_READY = m->AddProperty<bool>(
ImpaladMetricKeys::IMPALA_SERVER_READY, false);
- IMPALA_SERVER_NUM_QUERIES = m->AddCounter<int64_t>(
+ IMPALA_SERVER_NUM_QUERIES = m->AddCounter(
ImpaladMetricKeys::IMPALA_SERVER_NUM_QUERIES, 0);
- NUM_QUERIES_REGISTERED = m->AddGauge<int64_t>(ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
- NUM_QUERIES_EXPIRED = m->AddCounter<int64_t>(
+ NUM_QUERIES_REGISTERED = m->AddGauge(
+ ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
+ NUM_QUERIES_EXPIRED = m->AddCounter(
ImpaladMetricKeys::NUM_QUERIES_EXPIRED, 0);
- NUM_QUERIES_SPILLED = m->AddCounter<int64_t>(
+ NUM_QUERIES_SPILLED = m->AddCounter(
ImpaladMetricKeys::NUM_QUERIES_SPILLED, 0);
- IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter<int64_t>(
+ IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter(
ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS, 0);
IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = m->AddGauge(
ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT, 0L);
- IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge<int64_t>(
+ IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge(
ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS, 0);
- IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge<int64_t>(
+ IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge(
ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS, 0);
- NUM_SESSIONS_EXPIRED = m->AddCounter<int64_t>(
+ NUM_SESSIONS_EXPIRED = m->AddCounter(
ImpaladMetricKeys::NUM_SESSIONS_EXPIRED, 0);
- RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge<int64_t>(
+ RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge(
ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_NUM_ROWS, 0);
- RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge<int64_t>(
+ RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge(
ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_BYTES, 0);
// Initialize scan node metrics
- NUM_RANGES_PROCESSED = m->AddCounter<int64_t>(
+ NUM_RANGES_PROCESSED = m->AddCounter(
ImpaladMetricKeys::TOTAL_SCAN_RANGES_PROCESSED, 0);
- NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter<int64_t>(
+ NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter(
ImpaladMetricKeys::NUM_SCAN_RANGES_MISSING_VOLUME_ID, 0);
// Initialize memory usage metrics
- MEM_POOL_TOTAL_BYTES = m->AddGauge<int64_t>(
+ MEM_POOL_TOTAL_BYTES = m->AddGauge(
ImpaladMetricKeys::MEM_POOL_TOTAL_BYTES, 0);
- HASH_TABLE_TOTAL_BYTES = m->AddGauge<int64_t>(
+ HASH_TABLE_TOTAL_BYTES = m->AddGauge(
ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES, 0);
// Initialize insert metrics
- NUM_FILES_OPEN_FOR_INSERT = m->AddGauge<int64_t>(
+ NUM_FILES_OPEN_FOR_INSERT = m->AddGauge(
ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
// Initialize IO mgr metrics
- IO_MGR_NUM_OPEN_FILES = m->AddGauge<int64_t>(
- ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
- IO_MGR_NUM_BUFFERS = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
- IO_MGR_TOTAL_BYTES = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
- IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge<int64_t>(
+ IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
+ IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
+ IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
+ IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
- IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge<int64_t>(
+ IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
- IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge<int64_t>(
+ IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(
ImpaladMetricKeys::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING, 0);
- IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge<int64_t>(
+ IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge(
ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT, 0);
- IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge<int64_t>(
+ IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge(
ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT, 0);
- IO_MGR_BYTES_READ = m->AddCounter<int64_t>(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
- IO_MGR_LOCAL_BYTES_READ = m->AddCounter<int64_t>(
+ IO_MGR_BYTES_READ = m->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
+ IO_MGR_LOCAL_BYTES_READ = m->AddCounter(
ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
- IO_MGR_CACHED_BYTES_READ = m->AddCounter<int64_t>(
+ IO_MGR_CACHED_BYTES_READ = m->AddCounter(
ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
- IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter<int64_t>(
+ IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter(
ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
- IO_MGR_BYTES_WRITTEN = m->AddCounter<int64_t>(
+ IO_MGR_BYTES_WRITTEN = m->AddCounter(
ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0);
IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
@@ -233,8 +233,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO);
// Initialize catalog metrics
- CATALOG_NUM_DBS = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
- CATALOG_NUM_TABLES = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
+ CATALOG_NUM_DBS = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
+ CATALOG_NUM_TABLES = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
CATALOG_READY = m->AddProperty<bool>(ImpaladMetricKeys::CATALOG_READY, false);
// Maximum duration to be tracked by the query durations metric. No particular reasoning
@@ -248,8 +248,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3));
// Initialize Hedged read metrics
- HEDGED_READ_OPS = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
- HEDGED_READ_OPS_WIN = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
+ HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
+ HEDGED_READ_OPS_WIN = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 3308bf4..fd78343 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -32,7 +32,7 @@ using namespace strings;
DECLARE_bool(mmap_buffers);
-SumGauge<int64_t>* AggregateMemoryMetrics::TOTAL_USED = nullptr;
+SumGauge* AggregateMemoryMetrics::TOTAL_USED = nullptr;
IntGauge* AggregateMemoryMetrics::NUM_MAPS = nullptr;
IntGauge* AggregateMemoryMetrics::MAPPED_BYTES = nullptr;
IntGauge* AggregateMemoryMetrics::RSS = nullptr;
@@ -110,19 +110,19 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
#endif
MetricGroup* aggregate_metrics = metrics->GetOrCreateChildGroup("memory");
AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
- new SumGauge<int64_t>(MetricDefs::Get("memory.total-used"), used_metrics));
+ new SumGauge(MetricDefs::Get("memory.total-used"), used_metrics));
if (register_jvm_metrics) {
RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
}
if (MemInfo::HaveSmaps()) {
AggregateMemoryMetrics::NUM_MAPS =
- aggregate_metrics->AddGauge<int64_t>("memory.num-maps", 0U);
+ aggregate_metrics->AddGauge("memory.num-maps", 0U);
AggregateMemoryMetrics::MAPPED_BYTES =
- aggregate_metrics->AddGauge<int64_t>("memory.mapped-bytes", 0U);
- AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge<int64_t>("memory.rss", 0U);
+ aggregate_metrics->AddGauge("memory.mapped-bytes", 0U);
+ AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge("memory.rss", 0U);
AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES =
- aggregate_metrics->AddGauge<int64_t>("memory.anon-huge-page-bytes", 0U);
+ aggregate_metrics->AddGauge("memory.anon-huge-page-bytes", 0U);
}
ThpConfig thp_config = MemInfo::ParseThpConfig();
AggregateMemoryMetrics::THP_ENABLED =
@@ -139,16 +139,16 @@ void AggregateMemoryMetrics::Refresh() {
if (NUM_MAPS != nullptr) {
// Only call ParseSmaps() if the metrics were created.
MappedMemInfo map_info = MemInfo::ParseSmaps();
- NUM_MAPS->set_value(map_info.num_maps);
- MAPPED_BYTES->set_value(map_info.size_kb * 1024);
- RSS->set_value(map_info.rss_kb * 1024);
- ANON_HUGE_PAGE_BYTES->set_value(map_info.anon_huge_pages_kb * 1024);
+ NUM_MAPS->SetValue(map_info.num_maps);
+ MAPPED_BYTES->SetValue(map_info.size_kb * 1024);
+ RSS->SetValue(map_info.rss_kb * 1024);
+ ANON_HUGE_PAGE_BYTES->SetValue(map_info.anon_huge_pages_kb * 1024);
}
ThpConfig thp_config = MemInfo::ParseThpConfig();
- THP_ENABLED->set_value(thp_config.enabled);
- THP_DEFRAG->set_value(thp_config.defrag);
- THP_KHUGEPAGED_DEFRAG->set_value(thp_config.khugepaged_defrag);
+ THP_ENABLED->SetValue(thp_config.enabled);
+ THP_DEFRAG->SetValue(thp_config.defrag);
+ THP_KHUGEPAGED_DEFRAG->SetValue(thp_config.khugepaged_defrag);
}
JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
@@ -192,35 +192,36 @@ Status JvmMetric::InitMetrics(MetricGroup* metrics) {
return Status::OK();
}
-void JvmMetric::CalculateValue() {
+int64_t JvmMetric::GetValue() {
TGetJvmMetricsRequest request;
request.get_all = false;
request.__set_memory_pool(mempool_name_);
TGetJvmMetricsResponse response;
- if (!JniUtil::GetJvmMetrics(request, &response).ok()) return;
- if (response.memory_pools.size() != 1) return;
+ if (!JniUtil::GetJvmMetrics(request, &response).ok()) return 0;
+ if (response.memory_pools.size() != 1) return 0;
TJvmMemoryPool& pool = response.memory_pools[0];
DCHECK(pool.name == mempool_name_);
switch (metric_type_) {
- case MAX: value_ = pool.max;
- return;
- case INIT: value_ = pool.init;
- return;
- case CURRENT: value_ = pool.used;
- return;
- case COMMITTED: value_ = pool.committed;
- return;
- case PEAK_MAX: value_ = pool.peak_max;
- return;
- case PEAK_INIT: value_ = pool.peak_init;
- return;
- case PEAK_CURRENT: value_ = pool.peak_used;
- return;
- case PEAK_COMMITTED: value_ = pool.peak_committed;
- return;
+ case MAX:
+ return pool.max;
+ case INIT:
+ return pool.init;
+ case CURRENT:
+ return pool.used;
+ case COMMITTED:
+ return pool.committed;
+ case PEAK_MAX:
+ return pool.peak_max;
+ case PEAK_INIT:
+ return pool.peak_init;
+ case PEAK_CURRENT:
+ return pool.peak_used;
+ case PEAK_COMMITTED:
+ return pool.peak_committed;
default:
DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
}
+ return 0;
}
Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
@@ -263,47 +264,39 @@ BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType t
global_reservations_(global_reservations),
buffer_pool_(buffer_pool) {}
-void BufferPoolMetric::CalculateValue() {
+int64_t BufferPoolMetric::GetValue() {
// IMPALA-6362: we have to be careful that none of the below calls to ReservationTracker
// methods acquire ReservationTracker::lock_ to avoid a potential circular dependency
// with MemTracker::child_trackers_lock_, which may be held when refreshing MemTracker
// consumption.
switch (type_) {
case BufferPoolMetricType::LIMIT:
- value_ = buffer_pool_->GetSystemBytesLimit();
- break;
+ return buffer_pool_->GetSystemBytesLimit();
case BufferPoolMetricType::SYSTEM_ALLOCATED:
- value_ = buffer_pool_->GetSystemBytesAllocated();
- break;
+ return buffer_pool_->GetSystemBytesAllocated();
case BufferPoolMetricType::RESERVED:
- value_ = global_reservations_->GetReservation();
- break;
+ return global_reservations_->GetReservation();
case BufferPoolMetricType::UNUSED_RESERVATION_BYTES: {
// Estimate the unused reservation based on other aggregate values, defined as
// the total bytes of reservation where there is no corresponding buffer in use
// by a client. Buffers are either in-use, free buffers, or attached to clean pages.
int64_t total_used_reservation = buffer_pool_->GetSystemBytesAllocated()
- - buffer_pool_->GetFreeBufferBytes()
- - buffer_pool_->GetCleanPageBytes();
- value_ = global_reservations_->GetReservation() - total_used_reservation;
- break;
+ - buffer_pool_->GetFreeBufferBytes()
+ - buffer_pool_->GetCleanPageBytes();
+ return global_reservations_->GetReservation() - total_used_reservation;
}
case BufferPoolMetricType::NUM_FREE_BUFFERS:
- value_ = buffer_pool_->GetNumFreeBuffers();
- break;
+ return buffer_pool_->GetNumFreeBuffers();
case BufferPoolMetricType::FREE_BUFFER_BYTES:
- value_ = buffer_pool_->GetFreeBufferBytes();
- break;
+ return buffer_pool_->GetFreeBufferBytes();
case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
- value_ = buffer_pool_->GetCleanPageBytesLimit();
- break;
+ return buffer_pool_->GetCleanPageBytesLimit();
case BufferPoolMetricType::NUM_CLEAN_PAGES:
- value_ = buffer_pool_->GetNumCleanPages();
- break;
+ return buffer_pool_->GetNumCleanPages();
case BufferPoolMetricType::CLEAN_PAGE_BYTES:
- value_ = buffer_pool_->GetCleanPageBytes();
- break;
+ return buffer_pool_->GetCleanPageBytes();
default:
DCHECK(false) << "Unknown BufferPoolMetricType: " << static_cast<int>(type_);
}
+ return 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 3294c30..6c10e09 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -44,7 +44,7 @@ class AggregateMemoryMetrics {
/// including JVM memory), which is either in use by queries or cached by the BufferPool
/// or the malloc implementation.
/// TODO: IMPALA-691 - consider changing this to include JVM memory.
- static SumGauge<int64_t>* TOTAL_USED;
+ static SumGauge* TOTAL_USED;
/// The total number of virtual memory regions for the process.
/// The value must be refreshed by calling Refresh().
@@ -106,9 +106,8 @@ class TcmallocMetric : public IntGauge {
public:
PhysicalBytesMetric(const TMetricDef& def) : IntGauge(def, 0) { }
- private:
- virtual void CalculateValue() {
- value_ = TOTAL_BYTES_RESERVED->value() - PAGEHEAP_UNMAPPED_BYTES->value();
+ virtual int64_t GetValue() override {
+ return TOTAL_BYTES_RESERVED->GetValue() - PAGEHEAP_UNMAPPED_BYTES->GetValue();
}
};
@@ -117,20 +116,21 @@ class TcmallocMetric : public IntGauge {
static TcmallocMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::string& tcmalloc_var);
+ virtual int64_t GetValue() override {
+ int64_t retval = 0;
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+ MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
+ reinterpret_cast<size_t*>(&retval));
+#endif
+ return retval;
+ }
+
private:
/// Name of the tcmalloc property this metric should fetch.
const std::string tcmalloc_var_;
TcmallocMetric(const TMetricDef& def, const std::string& tcmalloc_var)
- : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
-
- virtual void CalculateValue() {
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
- DCHECK_EQ(sizeof(size_t), sizeof(value_));
- MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
- reinterpret_cast<size_t*>(&value_));
-#endif
- }
+ : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
};
/// Alternative to TCMallocMetric if we're running under a sanitizer that replaces
@@ -138,12 +138,16 @@ class TcmallocMetric : public IntGauge {
class SanitizerMallocMetric : public IntGauge {
public:
SanitizerMallocMetric(const TMetricDef& def) : IntGauge(def, 0) {}
+
static SanitizerMallocMetric* BYTES_ALLOCATED;
- private:
- virtual void CalculateValue() override {
+
+ virtual int64_t GetValue() override {
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
- value_ = __sanitizer_get_current_allocated_bytes();
+ return __sanitizer_get_current_allocated_bytes();
+#else
+ return 0;
#endif
+
}
};
@@ -157,10 +161,9 @@ class JvmMetric : public IntGauge {
/// pool (usually ~5 pools plus a synthetic 'total' pool).
static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
- protected:
/// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
/// right value from that structure according to metric_type_.
- virtual void CalculateValue();
+ virtual int64_t GetValue() override;
private:
/// Each names one of the fields in TJvmMemoryPool.
@@ -206,8 +209,7 @@ class BufferPoolMetric : public IntGauge {
static BufferPoolMetric* NUM_CLEAN_PAGES;
static BufferPoolMetric* CLEAN_PAGE_BYTES;
- protected:
- virtual void CalculateValue();
+ virtual int64_t GetValue() override;
private:
friend class ReservationTrackerTest;
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 0126281..bfbfdfe 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -36,7 +36,7 @@ namespace impala {
template <typename M, typename T>
void AssertValue(M* metric, const T& value,
const string& human_readable) {
- EXPECT_EQ(metric->value(), value);
+ EXPECT_EQ(metric->GetValue(), value);
if (!human_readable.empty()) {
EXPECT_EQ(metric->ToHumanReadable(), human_readable);
}
@@ -73,36 +73,36 @@ class MetricsTest : public testing::Test {
TEST_F(MetricsTest, CounterMetrics) {
MetricGroup metrics("CounterMetrics");
AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT);
- IntCounter* int_counter = metrics.AddCounter<int64_t>("counter", 0);
+ IntCounter* int_counter = metrics.AddCounter("counter", 0);
AssertValue(int_counter, 0, "0");
int_counter->Increment(1);
AssertValue(int_counter, 1, "1");
int_counter->Increment(10);
AssertValue(int_counter, 11, "11");
- int_counter->set_value(3456);
+ int_counter->SetValue(3456);
AssertValue(int_counter, 3456, "3.46K");
AddMetricDef("counter_with_units", TMetricKind::COUNTER, TUnit::BYTES);
IntCounter* int_counter_with_units =
- metrics.AddCounter<int64_t>("counter_with_units", 10);
+ metrics.AddCounter("counter_with_units", 10);
AssertValue(int_counter_with_units, 10, "10.00 B");
}
TEST_F(MetricsTest, GaugeMetrics) {
MetricGroup metrics("GaugeMetrics");
AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
- IntGauge* int_gauge = metrics.AddGauge<int64_t>("gauge", 0);
+ IntGauge* int_gauge = metrics.AddGauge("gauge", 0);
AssertValue(int_gauge, 0, "0");
int_gauge->Increment(-1);
AssertValue(int_gauge, -1, "-1");
int_gauge->Increment(10);
AssertValue(int_gauge, 9, "9");
- int_gauge->set_value(3456);
+ int_gauge->SetValue(3456);
AssertValue(int_gauge, 3456, "3456");
AddMetricDef("gauge_with_units", TMetricKind::GAUGE, TUnit::TIME_S);
IntGauge* int_gauge_with_units =
- metrics.AddGauge<int64_t>("gauge_with_units", 10);
+ metrics.AddGauge("gauge_with_units", 10);
AssertValue(int_gauge_with_units, 10, "10s000ms");
}
@@ -111,12 +111,12 @@ TEST_F(MetricsTest, SumGauge) {
AddMetricDef("gauge1", TMetricKind::GAUGE, TUnit::NONE);
AddMetricDef("gauge2", TMetricKind::GAUGE, TUnit::NONE);
AddMetricDef("sum", TMetricKind::GAUGE, TUnit::NONE);
- IntGauge* gauge1 = metrics.AddGauge<int64_t>("gauge1", 0);
- IntGauge* gauge2 = metrics.AddGauge<int64_t>("gauge2", 0);
+ IntGauge* gauge1 = metrics.AddGauge("gauge1", 0);
+ IntGauge* gauge2 = metrics.AddGauge("gauge2", 0);
vector<IntGauge*> gauges({gauge1, gauge2});
IntGauge* sum_gauge =
- metrics.RegisterMetric(new SumGauge<int64_t>(MetricDefs::Get("sum"), gauges));
+ metrics.RegisterMetric(new SumGauge(MetricDefs::Get("sum"), gauges));
AssertValue(sum_gauge, 0, "0");
gauge1->Increment(1);
@@ -132,14 +132,14 @@ TEST_F(MetricsTest, PropertyMetrics) {
AddMetricDef("bool_property", TMetricKind::PROPERTY, TUnit::NONE);
BooleanProperty* bool_property = metrics.AddProperty("bool_property", false);
AssertValue(bool_property, false, "false");
- bool_property->set_value(true);
+ bool_property->SetValue(true);
AssertValue(bool_property, true, "true");
AddMetricDef("string_property", TMetricKind::PROPERTY, TUnit::NONE);
StringProperty* string_property = metrics.AddProperty("string_property",
string("string1"));
AssertValue(string_property, "string1", "string1");
- string_property->set_value("string2");
+ string_property->SetValue("string2");
AssertValue(string_property, "string2", "string2");
}
@@ -147,11 +147,11 @@ TEST_F(MetricsTest, NonFiniteValues) {
MetricGroup metrics("NanValues");
AddMetricDef("inf_value", TMetricKind::GAUGE, TUnit::NONE);
double inf = numeric_limits<double>::infinity();
- DoubleGauge* gauge = metrics.AddGauge("inf_value", inf);
+ DoubleGauge* gauge = metrics.AddDoubleGauge("inf_value", inf);
AssertValue(gauge, inf, "inf");
double nan = numeric_limits<double>::quiet_NaN();
- gauge->set_value(nan);
- EXPECT_TRUE(std::isnan(gauge->value()));
+ gauge->SetValue(nan);
+ EXPECT_TRUE(std::isnan(gauge->GetValue()));
EXPECT_TRUE(gauge->ToHumanReadable() == "nan");
}
@@ -223,19 +223,19 @@ TEST_F(MetricsTest, MemMetric) {
metrics.FindMetricForTesting<IntGauge>("tcmalloc.bytes-in-use");
ASSERT_TRUE(bytes_in_use != NULL);
- uint64_t cur_in_use = bytes_in_use->value();
+ uint64_t cur_in_use = bytes_in_use->GetValue();
EXPECT_GT(cur_in_use, 0);
// Allocate 100MB to increase the number of bytes used. TCMalloc may also give up some
// bytes during this allocation, so this allocation is deliberately large to ensure that
// the bytes used metric goes up net.
scoped_ptr<vector<uint64_t>> chunk(new vector<uint64_t>(100 * 1024 * 1024));
- EXPECT_GT(bytes_in_use->value(), cur_in_use);
+ EXPECT_GT(bytes_in_use->GetValue(), cur_in_use);
IntGauge* total_bytes_reserved =
metrics.FindMetricForTesting<IntGauge>("tcmalloc.total-bytes-reserved");
ASSERT_TRUE(total_bytes_reserved != NULL);
- ASSERT_GT(total_bytes_reserved->value(), 0);
+ ASSERT_GT(total_bytes_reserved->GetValue(), 0);
IntGauge* pageheap_free_bytes =
metrics.FindMetricForTesting<IntGauge>("tcmalloc.pageheap-free-bytes");
@@ -254,12 +254,12 @@ TEST_F(MetricsTest, JvmMetrics) {
metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
"jvm.total.current-usage-bytes");
ASSERT_TRUE(jvm_total_used != NULL);
- EXPECT_GT(jvm_total_used->value(), 0);
+ EXPECT_GT(jvm_total_used->GetValue(), 0);
IntGauge* jvm_peak_total_used =
metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
"jvm.total.peak-current-usage-bytes");
ASSERT_TRUE(jvm_peak_total_used != NULL);
- EXPECT_GT(jvm_peak_total_used->value(), 0);
+ EXPECT_GT(jvm_peak_total_used->GetValue(), 0);
}
void AssertJson(const Value& val, const string& name, const string& value,
@@ -274,7 +274,7 @@ void AssertJson(const Value& val, const string& name, const string& value,
TEST_F(MetricsTest, CountersJson) {
MetricGroup metrics("CounterMetrics");
AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT, "description");
- metrics.AddCounter<int64_t>("counter", 0);
+ metrics.AddCounter("counter", 0);
Document document;
Value val;
metrics.ToJson(true, &document, &val);
@@ -286,7 +286,7 @@ TEST_F(MetricsTest, CountersJson) {
TEST_F(MetricsTest, GaugesJson) {
MetricGroup metrics("GaugeMetrics");
AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
- metrics.AddGauge<int64_t>("gauge", 10);
+ metrics.AddGauge("gauge", 10);
Document document;
Value val;
metrics.ToJson(true, &document, &val);
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 12d6df3..b513c1e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -27,6 +27,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread/locks.hpp>
+#include "common/atomic.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "common/status.h"
@@ -118,59 +119,37 @@ class Metric {
void AddStandardFields(rapidjson::Document* document, rapidjson::Value* val);
};
-/// A SimpleMetric has a value which is a simple primitive type: e.g. integers, strings and
-/// floats. It is parameterised not only by the type of its value, but by both the unit
-/// (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself. The kind
-/// can be one of: 'gauge', which may increase or decrease over time, a 'counter' which is
-/// increasing only over time, or a 'property' which is not numeric.
-//
-/// SimpleMetrics return their current value through the value() method. Access to value()
-/// is thread-safe.
-//
-/// TODO: We can use type traits to select a more efficient lock-free implementation of
-/// value() etc. where it is safe to do so.
-/// TODO: CalculateValue() can be returning a value, its current interface is not clean.
-template<typename T, TMetricKind::type metric_kind=TMetricKind::GAUGE>
-class SimpleMetric : public Metric {
+/// A ScalarMetric has a value which is a simple primitive type: e.g. integers, strings
+/// and floats. It is parameterised not only by the type of its value, but by both the
+/// unit (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself.
+/// The kind can be one of:
+/// - 'gauge', which may increase or decrease over time,
+/// - 'counter' which can only increase over time
+/// - 'property' which is a value store which can be read and written only
+///
+/// Note that management software may use the metric kind as hint on how to display
+/// the value. ScalarMetrics return their current value through the GetValue() method
+/// and set/initialize the value with SetValue(). Both methods are thread safe.
+template<typename T, TMetricKind::type metric_kind_t>
+class ScalarMetric: public Metric {
public:
- SimpleMetric(const TMetricDef& metric_def, const T& initial_value)
- : Metric(metric_def), unit_(metric_def.units), value_(initial_value) {
- DCHECK_EQ(metric_kind, metric_def.kind) << "Metric kind does not match definition: "
+ ScalarMetric(const TMetricDef& metric_def)
+ : Metric(metric_def), unit_(metric_def.units) {
+ DCHECK_EQ(metric_kind_t, metric_def.kind) << "Metric kind does not match definition: "
<< metric_def.key;
}
- virtual ~SimpleMetric() { }
-
- /// Returns the current value, updating it if necessary. Thread-safe.
- T value() {
- boost::lock_guard<SpinLock> l(lock_);
- CalculateValue();
- return value_;
- }
-
- /// Sets the current value. Thread-safe.
- void set_value(const T& value) {
- boost::lock_guard<SpinLock> l(lock_);
- value_ = value;
- }
+ virtual ~ScalarMetric() { }
- /// Adds 'delta' to the current value atomically.
- void Increment(const T& delta) {
- DCHECK(kind() != TMetricKind::PROPERTY)
- << "Can't change value of PROPERTY metric: " << key();
- DCHECK(kind() != TMetricKind::COUNTER || delta >= 0)
- << "Can't decrement value of COUNTER metric: " << key();
- if (delta == 0) return;
- boost::lock_guard<SpinLock> l(lock_);
- value_ += delta;
- }
+ /// Returns the current value. Thread-safe.
+ virtual T GetValue() = 0;
- virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) {
+ virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) override {
rapidjson::Value container(rapidjson::kObjectType);
AddStandardFields(document, &container);
rapidjson::Value metric_value;
- ToJsonValue(value(), TUnit::NONE, document, &metric_value);
+ ToJsonValue(GetValue(), TUnit::NONE, document, &metric_value);
container.AddMember("value", metric_value, document->GetAllocator());
rapidjson::Value type_value(PrintTMetricKind(kind()).c_str(),
@@ -181,30 +160,46 @@ class SimpleMetric : public Metric {
*val = container;
}
- virtual std::string ToHumanReadable() {
- return PrettyPrinter::Print(value(), unit());
+ virtual std::string ToHumanReadable() override {
+ return PrettyPrinter::Print(GetValue(), unit());
}
- virtual void ToLegacyJson(rapidjson::Document* document) {
+ virtual void ToLegacyJson(rapidjson::Document* document) override {
rapidjson::Value val;
- ToJsonValue(value(), TUnit::NONE, document, &val);
+ ToJsonValue(GetValue(), TUnit::NONE, document, &val);
document->AddMember(key_.c_str(), val, document->GetAllocator());
}
TUnit::type unit() const { return unit_; }
- TMetricKind::type kind() const { return metric_kind; }
+ TMetricKind::type kind() const { return metric_kind_t; }
protected:
- /// Called to compute value_ if necessary during calls to value(). The more natural
- /// approach would be to have virtual T value(), but that's not possible in C++.
- //
- /// TODO: Should be cheap to have a blank implementation, but if required we can cause
- /// the compiler to avoid calling this entirely through a compile-time constant.
- virtual void CalculateValue() { }
-
/// Units of this metric.
const TUnit::type unit_;
+};
+/// An implementation of scalar metric with spinlock.
+template<typename T, TMetricKind::type metric_kind_t>
+class LockedMetric : public ScalarMetric<T, metric_kind_t> {
+ public:
+ LockedMetric(const TMetricDef& metric_def, const T& initial_value)
+ : ScalarMetric<T, metric_kind_t>(metric_def), value_(initial_value) {}
+
+ virtual ~LockedMetric() {}
+
+ /// Atomically reads the current value.
+ virtual T GetValue() override {
+ boost::lock_guard<SpinLock> l(lock_);
+ return value_;
+ }
+
+ /// Atomically sets the value.
+ void SetValue(const T& value) {
+ boost::lock_guard<SpinLock> l(lock_);
+ value_ = value;
+ }
+
+ protected:
/// Guards access to value_.
SpinLock lock_;
@@ -212,42 +207,81 @@ class SimpleMetric : public Metric {
T value_;
};
-// Gauge metric that computes the sum of several gauges.
-template <typename T>
-class SumGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+typedef class LockedMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
+typedef class LockedMetric<std::string,TMetricKind::PROPERTY> StringProperty;
+typedef class LockedMetric<double, TMetricKind::GAUGE> DoubleGauge;
+
+/// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented
+/// atomically via the Increment() interface.
+template<TMetricKind::type metric_kind_t>
+class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
public:
- SumGauge(const TMetricDef& metric_def,
- const std::vector<SimpleMetric<T, TMetricKind::GAUGE>*>& metrics)
- : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metrics_(metrics) {}
+ AtomicMetric(const TMetricDef& metric_def, const int64_t initial_value)
+ : ScalarMetric<int64_t, metric_kind_t>(metric_def), value_(initial_value) {
+ DCHECK(metric_kind_t == TMetricKind::GAUGE || metric_kind_t == TMetricKind::COUNTER);
+ }
+
+ virtual ~AtomicMetric() {}
+
+ /// Atomically reads the current value. May be overridden by derived classes.
+ /// The default implementation just atomically loads 'value_'. Derived classes
+ /// which derive the return value from mutliple sources other than 'value_'
+ /// need to take care of synchronization among sources.
+ virtual int64_t GetValue() override { return value_.Load(); }
+
+ /// Atomically sets the value.
+ void SetValue(const int64_t& value) { value_.Store(value); }
+
+ /// Adds 'delta' to the current value atomically.
+ void Increment(int64_t delta) {
+ DCHECK(metric_kind_t != TMetricKind::COUNTER || delta >= 0)
+ << "Can't decrement value of COUNTER metric: " << this->key();
+ value_.Add(delta);
+ }
+
+ protected:
+ /// The current value of the metric.
+ AtomicInt64 value_;
+};
+
+/// We write 'Int' as a placeholder for all integer types.
+typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
+typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
+
+/// Gauge metric that computes the sum of several gauges.
+class SumGauge : public IntGauge {
+ public:
+ SumGauge(const TMetricDef& metric_def, const std::vector<IntGauge*>& gauges)
+ : IntGauge(metric_def, 0), gauges_(gauges) {}
+
virtual ~SumGauge() {}
- private:
- virtual void CalculateValue() override {
- T sum = 0;
- for (SimpleMetric<T, TMetricKind::GAUGE>* metric : metrics_) sum += metric->value();
- this->value_ = sum;
+ virtual int64_t GetValue() override {
+ // Note that this doesn't hold the locks of all gauages before computing the sum so
+ // it's possible for one of the gauages to change after being read and added to sum.
+ int64_t sum = 0;
+ for (auto gauge : gauges_) sum += gauge->GetValue();
+ return sum;
}
- /// The metrics to be summed.
- std::vector<SimpleMetric<T, TMetricKind::GAUGE>*> metrics_;
+ private:
+ /// The gauges to be summed.
+ std::vector<IntGauge*> gauges_;
};
-// Gauge metric that negates another gauge.
-template <typename T>
-class NegatedGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+/// Gauge metric that negates another gauge.
+class NegatedGauge : public IntGauge {
public:
- NegatedGauge(const TMetricDef& metric_def,
- SimpleMetric<T, TMetricKind::GAUGE>* metric)
- : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metric_(metric) {}
+ NegatedGauge(const TMetricDef& metric_def, IntGauge* gauge)
+ : IntGauge(metric_def, 0), gauge_(gauge) {}
+
virtual ~NegatedGauge() {}
- private:
- virtual void CalculateValue() override {
- this->value_ = -metric_->value();
- }
+ virtual int64_t GetValue() override { return -gauge_->GetValue(); }
+ private:
/// The metric to be negated.
- SimpleMetric<T, TMetricKind::GAUGE>* metric_;
+ IntGauge* gauge_;
};
/// Container for a set of metrics. A MetricGroup owns the memory for every metric
@@ -285,27 +319,28 @@ class MetricGroup {
}
/// Create a gauge metric object with given key and initial value (owned by this object)
- template<typename T>
- SimpleMetric<T>* AddGauge(const std::string& key, const T& value,
+ IntGauge* AddGauge(const std::string& key, const int64_t value,
const std::string& metric_def_arg = "") {
- return RegisterMetric(new SimpleMetric<T, TMetricKind::GAUGE>(
- MetricDefs::Get(key, metric_def_arg), value));
+ return RegisterMetric(new IntGauge(MetricDefs::Get(key, metric_def_arg), value));
}
- template<typename T>
- SimpleMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
- const T& value, const std::string& metric_def_arg = "") {
- return RegisterMetric(new SimpleMetric<T, TMetricKind::PROPERTY>(
- MetricDefs::Get(key, metric_def_arg), value));
+ DoubleGauge* AddDoubleGauge(const std::string& key, const double value,
+ const std::string& metric_def_arg = "") {
+ return RegisterMetric(new DoubleGauge(MetricDefs::Get(key, metric_def_arg), value));
}
template<typename T>
- SimpleMetric<T, TMetricKind::COUNTER>* AddCounter(const std::string& key,
+ LockedMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
const T& value, const std::string& metric_def_arg = "") {
- return RegisterMetric(new SimpleMetric<T, TMetricKind::COUNTER>(
+ return RegisterMetric(new LockedMetric<T, TMetricKind::PROPERTY>(
MetricDefs::Get(key, metric_def_arg), value));
}
+ IntCounter* AddCounter(const std::string& key, const int64_t value,
+ const std::string& metric_def_arg = "") {
+ return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value));
+ }
+
/// Returns a metric by key. All MetricGroups reachable from this group are searched in
/// depth-first order, starting with the root group. Returns NULL if there is no metric
/// with that key. This is not a very cheap operation; the result should be cached where
@@ -380,13 +415,6 @@ class MetricGroup {
rapidjson::Document* document);
};
-/// We write 'Int' as a placeholder for all integer types.
-typedef class SimpleMetric<int64_t, TMetricKind::GAUGE> IntGauge;
-typedef class SimpleMetric<double, TMetricKind::GAUGE> DoubleGauge;
-typedef class SimpleMetric<int64_t, TMetricKind::COUNTER> IntCounter;
-
-typedef class SimpleMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
-typedef class SimpleMetric<std::string, TMetricKind::PROPERTY> StringProperty;
/// Convenience method to instantiate a TMetricDef with a subset of its fields defined.
/// Most externally-visible metrics should be defined in metrics.json and retrieved via
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 536119b..8397f35 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -194,10 +194,8 @@ Status ThreadMgr::StartInstrumentation(MetricGroup* metrics) {
DCHECK(metrics != NULL);
lock_guard<mutex> l(lock_);
metrics_enabled_ = true;
- total_threads_metric_ = metrics->AddGauge<int64_t>(
- "thread-manager.total-threads-created", 0L);
- current_num_threads_metric_ = metrics->AddGauge<int64_t>(
- "thread-manager.running-threads", 0L);
+ total_threads_metric_ = metrics->AddGauge("thread-manager.total-threads-created", 0L);
+ current_num_threads_metric_ = metrics->AddGauge("thread-manager.running-threads", 0L);
return Status::OK();
}
@@ -224,7 +222,7 @@ void ThreadMgr::RemoveThread(const thread::id& boost_id, const string& category)
void ThreadMgr::GetThreadOverview(Document* document) {
lock_guard<mutex> l(lock_);
if (metrics_enabled_) {
- document->AddMember("total_threads", current_num_threads_metric_->value(),
+ document->AddMember("total_threads", current_num_threads_metric_->GetValue(),
document->GetAllocator());
}
Value lst(kArrayType);
http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index dafe986..f493d33 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -882,7 +882,7 @@
"IMPALAD"
],
"label": "StateStore Subscriber Last Recovery Duration",
- "units": "NONE",
+ "units": "TIME_S",
"kind": "GAUGE",
"key": "statestore-subscriber.last-recovery-duration"
},