You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/01/18 23:55:54 UTC

[3/3] impala git commit: IMPALA-2397: Use atomics for IntGauge and IntCounter

IMPALA-2397: Use atomics for IntGauge and IntCounter

This change removes the spinlock in IntGauge and IntCounter
and uses AtomicInt64 instead. As shown in IMPALA-2397, multiple
threads can be contending for the spinlocks of some global metrics
under concurrent queries.

This change also breaks up SimpleMetric is renamed to ScalarMetric
and broken into two subclasses:
- LockedMetric:
  - a value store for any primitive type (int,float,string etc).
  - atomic read and write via GetValue() and SetValue() respectively.

- AtomicMetric:
  - the basis of IntGauge and IntCounter. Support atomic increment
    of the metric value via Increment() interface.
  - atomic read and write via GetValue() and SetValue() respectively.
  - only support int64_t type.

Change-Id: I48dfa5443cd771916b53541a0ffeaf1bcc7e7606
Reviewed-on: http://gerrit.cloudera.org:8080/9012
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e714f2b3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e714f2b3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e714f2b3

Branch: refs/heads/master
Commit: e714f2b33c5b64d5680dbc15e166759930f04560
Parents: b3d38b5
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jan 10 19:28:09 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 18 23:31:52 2018 +0000

----------------------------------------------------------------------
 be/src/exec/external-data-source-executor.cc |   8 +-
 be/src/rpc/TAcceptQueueServer.cpp            |   2 +-
 be/src/rpc/thrift-server.cc                  |   4 +-
 be/src/runtime/client-cache.cc               |   6 +-
 be/src/runtime/data-stream-mgr.cc            |   6 +-
 be/src/runtime/exec-env.cc                   |  10 +-
 be/src/runtime/io/scan-range.cc              |   4 +-
 be/src/runtime/krpc-data-stream-mgr.cc       |   6 +-
 be/src/runtime/mem-tracker-test.cc           |   4 +-
 be/src/runtime/mem-tracker.cc                |  10 +-
 be/src/runtime/mem-tracker.h                 |   4 +-
 be/src/runtime/query-exec-mgr.cc             |   2 +-
 be/src/runtime/query-state.cc                |   6 +-
 be/src/runtime/tmp-file-mgr-test.cc          |   2 +-
 be/src/runtime/tmp-file-mgr.cc               |   4 +-
 be/src/scheduling/admission-controller.cc    |  58 +++---
 be/src/scheduling/scheduler.cc               |   9 +-
 be/src/service/impala-server.cc              |  10 +-
 be/src/service/session-expiry-test.cc        |  12 +-
 be/src/statestore/statestore-subscriber.cc   |  14 +-
 be/src/statestore/statestore.cc              |  11 +-
 be/src/util/common-metrics.cc                |   2 +-
 be/src/util/default-path-handlers.cc         |   2 +-
 be/src/util/impalad-metrics.cc               |  66 +++----
 be/src/util/memory-metrics.cc                |  99 +++++-----
 be/src/util/memory-metrics.h                 |  42 ++--
 be/src/util/metrics-test.cc                  |  44 ++---
 be/src/util/metrics.h                        | 222 ++++++++++++----------
 be/src/util/thread.cc                        |   8 +-
 common/thrift/metrics.json                   |   2 +-
 30 files changed, 350 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c810c6..7c54f39 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -76,9 +76,9 @@ class ExternalDataSourceExecutor::JniState {
         "getNumClassCacheMisses", "()J");
     RETURN_ERROR_IF_EXC(env);
 
-    num_class_cache_hits_ = metrics->AddCounter<int64_t>(
+    num_class_cache_hits_ = metrics->AddCounter(
         "external-data-source.class-cache.hits", 0);
-    num_class_cache_misses_ = metrics->AddCounter<int64_t>(
+    num_class_cache_misses_ = metrics->AddCounter(
         "external-data-source.class-cache.misses", 0);
     return Status::OK();
   }
@@ -92,11 +92,11 @@ class ExternalDataSourceExecutor::JniState {
     int64_t num_cache_hits = env->CallStaticLongMethod(executor_class_,
         get_num_cache_hits_id_);
     RETURN_ERROR_IF_EXC(env);
-    num_class_cache_hits_->set_value(num_cache_hits);
+    num_class_cache_hits_->SetValue(num_cache_hits);
     int64_t num_cache_misses = env->CallStaticLongMethod(executor_class_,
         get_num_cache_misses_id_);
     RETURN_ERROR_IF_EXC(env);
-    num_class_cache_misses_->set_value(num_cache_misses);
+    num_class_cache_misses_->SetValue(num_cache_misses);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 8a398a2..5c1b1da 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -286,7 +286,7 @@ void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_pre
   DCHECK(metrics != NULL);
   stringstream queue_size_ss;
   queue_size_ss << key_prefix << ".connection-setup-queue-size";
-  queue_size_metric_ = metrics->AddGauge<int64_t>(queue_size_ss.str(), 0);
+  queue_size_metric_ = metrics->AddGauge(queue_size_ss.str(), 0);
   metrics_enabled_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ab51315..eaca699 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -342,10 +342,10 @@ ThriftServer::ThriftServer(const string& name,
     metrics_enabled_ = true;
     stringstream count_ss;
     count_ss << "impala.thrift-server." << name << ".connections-in-use";
-    num_current_connections_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+    num_current_connections_metric_ = metrics->AddGauge(count_ss.str(), 0);
     stringstream max_ss;
     max_ss << "impala.thrift-server." << name << ".total-connections";
-    total_connections_metric_ = metrics->AddCounter<int64_t>(max_ss.str(), 0);
+    total_connections_metric_ = metrics->AddCounter(max_ss.str(), 0);
     metrics_ = metrics;
   } else {
     metrics_enabled_ = false;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 8c0b6aa..af530f7 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -94,7 +94,7 @@ Status ClientCacheHelper::ReopenClient(ClientFactory factory_method,
     // CreateClient() will increment total_clients_metric_ if succeed.
     if (metrics_enabled_) {
       total_clients_metric_->Increment(-1);
-      DCHECK_GE(total_clients_metric_->value(), 0);
+      DCHECK_GE(total_clients_metric_->GetValue(), 0);
     }
     lock_guard<mutex> lock(client_map_lock_);
     client_map_.erase(client);
@@ -235,11 +235,11 @@ void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_pref
   lock_guard<mutex> lock(cache_lock_);
   stringstream count_ss;
   count_ss << key_prefix << ".client-cache.clients-in-use";
-  clients_in_use_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+  clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
 
   stringstream max_ss;
   max_ss << key_prefix << ".client-cache.total-clients";
-  total_clients_metric_ = metrics->AddGauge<int64_t>(max_ss.str(), 0);
+  total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
   metrics_enabled_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 93c524e..45eee7f 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -57,10 +57,10 @@ namespace impala {
 DataStreamMgr::DataStreamMgr(MetricGroup* metrics) {
   metrics_ = metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
-      metrics_->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+      metrics_->AddGauge("senders-blocked-on-recvr-creation", 0L);
   total_senders_waited_ =
-      metrics_->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
-  num_senders_timedout_ = metrics_->AddCounter<int64_t>(
+      metrics_->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = metrics_->AddCounter(
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 6d9a857..f191921 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -333,9 +333,9 @@ Status ExecEnv::Init() {
   // Also need a MemTracker for unused reservations as a negative value. Unused
   // reservations are counted against queries but not against the process memory
   // consumption. This accounts for that difference.
-  IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge<int64_t>(
-        MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
-        BufferPoolMetric::UNUSED_RESERVATION_BYTES));
+  IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge(
+      MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
+      BufferPoolMetric::UNUSED_RESERVATION_BYTES));
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -350,13 +350,13 @@ Status ExecEnv::Init() {
   // reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
   // (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
   // used by TCMalloc.
-  IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge<int64_t>(
+  IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
       MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
       TcmallocMetric::BYTES_IN_USE));
   vector<IntGauge*> overhead_metrics;
   overhead_metrics.push_back(negated_bytes_in_use);
   overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
-  SumGauge<int64_t>* tcmalloc_overhead = obj_pool_->Add(new SumGauge<int64_t>(
+  SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
       MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
       overhead_metrics));
   obj_pool_->Add(

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index dc14050..21daa96 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -335,8 +335,8 @@ void ScanRange::Close() {
       struct hdfsHedgedReadMetrics* hedged_metrics;
       int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
       if (success == 0) {
-        ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
-        ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
+        ImpaladMetrics::HEDGED_READ_OPS->SetValue(hedged_metrics->hedgedReadOps);
+        ImpaladMetrics::HEDGED_READ_OPS_WIN->SetValue(hedged_metrics->hedgedReadOpsWin);
         hdfsFreeHedgedReadMetrics(hedged_metrics);
       }
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 348b9ab..86955c8 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -63,10 +63,10 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
   MetricGroup* dsm_metrics = metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
-      dsm_metrics->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+      dsm_metrics->AddGauge("senders-blocked-on-recvr-creation", 0L);
   total_senders_waited_ =
-      dsm_metrics->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
-  num_senders_timedout_ = dsm_metrics->AddCounter<int64_t>(
+      dsm_metrics->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = dsm_metrics->AddCounter(
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 4aaac05..faeb6a9 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -62,13 +62,13 @@ TEST(MemTestTest, ConsumptionMetric) {
   md.__set_units(TUnit::BYTES);
   md.__set_kind(TMetricKind::GAUGE);
   IntGauge metric(md, 0);
-  EXPECT_EQ(metric.value(), 0);
+  EXPECT_EQ(metric.GetValue(), 0);
 
   TMetricDef neg_md;
   neg_md.__set_key("neg_test");
   neg_md.__set_units(TUnit::BYTES);
   neg_md.__set_kind(TMetricKind::GAUGE);
-  NegatedGauge<int64_t> neg_metric(neg_md, &metric);
+  NegatedGauge neg_metric(neg_md, &metric);
 
   MemTracker t(&metric, 100, "");
   MemTracker neg_t(&neg_metric, 100, "");

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 98f45db..e5aa290 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -211,16 +211,16 @@ MemTracker::~MemTracker() {
 }
 
 void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
-  num_gcs_metric_ = metrics->AddCounter<int64_t>(Substitute("$0.num-gcs", prefix), 0);
+  num_gcs_metric_ = metrics->AddCounter(Substitute("$0.num-gcs", prefix), 0);
 
   // TODO: Consider a total amount of bytes freed counter
-  bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
+  bytes_freed_by_last_gc_metric_ = metrics->AddGauge(
       Substitute("$0.bytes-freed-by-last-gc", prefix), -1);
 
-  bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
+  bytes_over_limit_metric_ = metrics->AddGauge(
       Substitute("$0.bytes-over-limit", prefix), -1);
 
-  limit_metric_ = metrics->AddGauge<int64_t>(Substitute("$0.limit", prefix), limit_);
+  limit_metric_ = metrics->AddGauge(Substitute("$0.limit", prefix), limit_);
 }
 
 // Calling this on the query tracker results in output like:
@@ -430,7 +430,7 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
   }
 
   if (bytes_freed_by_last_gc_metric_ != NULL) {
-    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - curr_consumption);
+    bytes_freed_by_last_gc_metric_->SetValue(pre_gc_consumption - curr_consumption);
   }
   return curr_consumption > max_consumption;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index fb1cd90..c582d72 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -250,7 +250,7 @@ class MemTracker {
   bool LimitExceeded() {
     if (UNLIKELY(CheckLimitExceeded())) {
       if (bytes_over_limit_metric_ != NULL) {
-        bytes_over_limit_metric_->set_value(consumption() - limit_);
+        bytes_over_limit_metric_->SetValue(consumption() - limit_);
       }
       return GcMemory(limit_);
     }
@@ -274,7 +274,7 @@ class MemTracker {
   /// call if this tracker has a consumption metric.
   void RefreshConsumptionFromMetric() {
     DCHECK(consumption_metric_ != nullptr);
-    consumption_->Set(consumption_metric_->value());
+    consumption_->Set(consumption_metric_->GetValue());
   }
 
   int64_t limit() const { return limit_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 4f30f4e..316b712 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -123,7 +123,7 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // tcmalloc and address or thread sanitizer cannot be used together
   if (FLAGS_log_mem_usage_interval > 0) {
-    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
+    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
     if (num_complete % FLAGS_log_mem_usage_interval == 0) {
       char buf[2048];
       // This outputs how much memory is currently being used by this impalad

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 10c8033..259cd34 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -381,11 +381,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
       << " fragment_idx=" << fis->instance_ctx().fragment_idx
       << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
       << " coord_state_idx=" << rpc_params().coord_state_idx
-      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value();
+      << " #in-flight="
+      << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   Status status = fis->Exec();
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
   VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
-      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value()
+      << " #in-flight="
+      << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue()
       << " status=" << status;
   // initiate cancellation if nobody has done so yet
   if (!status.ok()) Cancel();

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index fbc0a36..3091c58 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -78,7 +78,7 @@ class TmpFileMgrTest : public ::testing::Test {
     vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
     IntGauge* active_metric =
         metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
-    EXPECT_EQ(active.size(), active_metric->value());
+    EXPECT_EQ(active.size(), active_metric->GetValue());
     SetMetric<string>* active_set_metric =
         metrics_->FindMetricForTesting<SetMetric<string>>(
         "tmp-file-mgr.active-scratch-dirs.list");

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 650af0b..d35d302 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -132,10 +132,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
 
   DCHECK(metrics != nullptr);
   num_active_scratch_dirs_metric_ =
-      metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
+      metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
   active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
       metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
-  num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
+  num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
     active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 99f659a..f43af2c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -482,9 +482,9 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
 }
 
 void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool_cfg) {
-  metrics_.pool_max_mem_resources->set_value(pool_cfg.max_mem_resources);
-  metrics_.pool_max_requests->set_value(pool_cfg.max_requests);
-  metrics_.pool_max_queued->set_value(pool_cfg.max_queued);
+  metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
+  metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
+  metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
 }
 
 Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
@@ -734,18 +734,18 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
 
   if (agg_num_running_ == num_running && agg_num_queued_ == num_queued &&
       agg_mem_reserved_ == mem_reserved) {
-    DCHECK_EQ(num_running, metrics_.agg_num_running->value());
-    DCHECK_EQ(num_queued, metrics_.agg_num_queued->value());
-    DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->value());
+    DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue());
+    DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue());
+    DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue());
     return;
   }
   VLOG_ROW << "Recomputed agg stats, previous: " << DebugString();
   agg_num_running_ = num_running;
   agg_num_queued_ = num_queued;
   agg_mem_reserved_ = mem_reserved;
-  metrics_.agg_num_running->set_value(num_running);
-  metrics_.agg_num_queued->set_value(num_queued);
-  metrics_.agg_mem_reserved->set_value(mem_reserved);
+  metrics_.agg_num_running->SetValue(num_running);
+  metrics_.agg_num_queued->SetValue(num_queued);
+  metrics_.agg_mem_reserved->SetValue(mem_reserved);
   VLOG_ROW << "Updated: " << DebugString();
 }
 
@@ -782,12 +782,12 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   if (current_reserved != local_stats_.backend_mem_reserved) {
     parent_->pools_for_updates_.insert(name_);
     local_stats_.backend_mem_reserved = current_reserved;
-    metrics_.local_backend_mem_reserved->set_value(current_reserved);
+    metrics_.local_backend_mem_reserved->SetValue(current_reserved);
   }
 
   const int64_t current_usage =
       tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
-  metrics_.local_backend_mem_usage->set_value(current_usage);
+  metrics_.local_backend_mem_usage->SetValue(current_usage);
 }
 
 void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
@@ -906,44 +906,44 @@ AdmissionController::GetPoolStats(const string& pool_name) {
 }
 
 void AdmissionController::PoolStats::InitMetrics() {
-  metrics_.total_admitted = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_admitted = parent_->metrics_group_->AddCounter(
       TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_queued = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_queued = parent_->metrics_group_->AddCounter(
       TOTAL_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_dequeued = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_dequeued = parent_->metrics_group_->AddCounter(
       TOTAL_DEQUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_rejected = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_rejected = parent_->metrics_group_->AddCounter(
       TOTAL_REJECTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_timed_out = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_timed_out = parent_->metrics_group_->AddCounter(
       TOTAL_TIMED_OUT_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_released = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_released = parent_->metrics_group_->AddCounter(
       TOTAL_RELEASED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter(
       TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.agg_num_running = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_num_running = parent_->metrics_group_->AddGauge(
       AGG_NUM_RUNNING_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.agg_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_num_queued = parent_->metrics_group_->AddGauge(
       AGG_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge(
       AGG_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge(
       LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge(
       LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_num_queued = parent_->metrics_group_->AddGauge(
       LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge(
       LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge(
       LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge(
       POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.pool_max_requests = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_requests = parent_->metrics_group_->AddGauge(
       POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.pool_max_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..e924f50 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -97,11 +97,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     // This is after registering with the statestored, so we already have to synchronize
     // access to the executors_config_ shared_ptr.
     int num_backends = GetExecutorsConfig()->NumBackends();
-    total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
-    total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
+    total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0);
+    total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
-    num_fragment_instances_metric_ =
-        metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends);
+    num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
   }
 
   if (statestore_subscriber_ != nullptr) {
@@ -197,7 +196,7 @@ void Scheduler::UpdateMembership(
 
   if (metrics_ != nullptr) {
     /// TODO-MT: fix this (do we even need to report it?)
-    num_fragment_instances_metric_->set_value(current_executors_.size());
+    num_fragment_instances_metric_->SetValue(current_executors_.size());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6358145..a62130c 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1059,8 +1059,8 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 Status ImpalaServer::UpdateCatalogMetrics() {
   TGetDbsResult dbs;
   RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
-  ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size());
-  ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
+  ImpaladMetrics::CATALOG_NUM_DBS->SetValue(dbs.dbs.size());
+  ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(0L);
   for (const TDatabase& db: dbs.dbs) {
     TGetTablesResult table_names;
     RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
@@ -1433,7 +1433,7 @@ void ImpalaServer::CatalogUpdateCallback(
       TTopicDelta& update = subscriber_topic_updates->back();
       update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
       update.__set_from_version(0L);
-      ImpaladMetrics::CATALOG_READY->set_value(false);
+      ImpaladMetrics::CATALOG_READY->SetValue(false);
       // Dropped all cached lib files (this behaves as if all functions and data
       // sources are dropped).
       LibCache::instance()->DropCache();
@@ -1447,7 +1447,7 @@ void ImpalaServer::CatalogUpdateCallback(
         LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
             << " new min catalog object version: " << resp.min_catalog_object_version;
       }
-      ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
+      ImpaladMetrics::CATALOG_READY->SetValue(new_catalog_version > 0);
       // TODO: deal with an error status
       discard_result(UpdateCatalogMetrics());
       // Remove all dropped objects from the library cache.
@@ -2130,7 +2130,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
   }
   services_started_ = true;
-  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
+  ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
   LOG(INFO) << "Impala has started.";
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index fa69476..a211701 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -58,8 +58,8 @@ TEST(SessionTest, TestExpiry) {
   IntGauge* hs2_session_metric =
       impala->metrics()->FindMetricForTesting<IntGauge>(
           ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS);
-  EXPECT_EQ(expired_metric->value(), 0L);
-  EXPECT_EQ(beeswax_session_metric->value(), 0L);
+  EXPECT_EQ(expired_metric->GetValue(), 0L);
+  EXPECT_EQ(beeswax_session_metric->GetValue(), 0L);
 
   {
     scoped_ptr<ThriftClient<ImpalaServiceClient>> beeswax_clients[NUM_SESSIONS];
@@ -80,16 +80,16 @@ TEST(SessionTest, TestExpiry) {
     }
 
     int64_t start = UnixMillis();
-    while (expired_metric->value() != NUM_SESSIONS * 2 &&
+    while (expired_metric->GetValue() != NUM_SESSIONS * 2 &&
       UnixMillis() - start < MAX_IDLE_TIMEOUT_MS) {
       SleepForMs(100);
     }
 
-    ASSERT_EQ(expired_metric->value(), NUM_SESSIONS * 2)
+    ASSERT_EQ(expired_metric->GetValue(), NUM_SESSIONS * 2)
         << "Sessions did not expire within "<< MAX_IDLE_TIMEOUT_MS / 1000 <<" secs";
-    ASSERT_EQ(beeswax_session_metric->value(), NUM_SESSIONS)
+    ASSERT_EQ(beeswax_session_metric->GetValue(), NUM_SESSIONS)
         << "Beeswax sessions unexpectedly closed after expiration";
-    ASSERT_EQ(hs2_session_metric->value(), NUM_SESSIONS)
+    ASSERT_EQ(hs2_session_metric->GetValue(), NUM_SESSIONS)
         << "HiveServer2 sessions unexpectedly closed after expiration";
 
     TPingImpalaServiceResp resp;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 678236e..99da183 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -113,7 +113,7 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
       metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
-  last_recovery_duration_metric_ = metrics_->AddGauge(
+  last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.last-recovery-time", "N/A");
@@ -164,12 +164,12 @@ Status StatestoreSubscriber::Register() {
   RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
       request, &response));
   Status status = Status(response.status);
-  if (status.ok()) connected_to_statestore_metric_->set_value(true);
+  if (status.ok()) connected_to_statestore_metric_->SetValue(true);
   if (response.__isset.registration_id) {
     lock_guard<mutex> l(registration_id_lock_);
     registration_id_ = response.registration_id;
     const string& registration_string = PrintId(registration_id_);
-    registration_id_metric_->set_value(registration_string);
+    registration_id_metric_->SetValue(registration_string);
     VLOG(1) << "Subscriber registration ID: " << registration_string;
   } else {
     VLOG(1) << "No subscriber registration ID received from statestore";
@@ -243,7 +243,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
       lock_guard<mutex> l(lock_);
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
-      connected_to_statestore_metric_->set_value(false);
+      connected_to_statestore_metric_->SetValue(false);
       LOG(INFO) << subscriber_id_
                 << ": Connection with statestore lost, entering recovery mode";
       uint32_t attempt_count = 1;
@@ -265,7 +265,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
                        << status.GetDetail();
           SleepForMs(SLEEP_INTERVAL_MS);
         }
-        last_recovery_duration_metric_->set_value(
+        last_recovery_duration_metric_->SetValue(
             recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
       }
       // When we're successful in re-registering, we don't do anything
@@ -273,9 +273,9 @@ void StatestoreSubscriber::RecoveryModeChecker() {
       // responsibility of individual clients to post missing updates
       // back to the statestore. This saves a lot of complexity where
       // we would otherwise have to cache updates here.
-      last_recovery_duration_metric_->set_value(
+      last_recovery_duration_metric_->SetValue(
           recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-      last_recovery_time_metric_->set_value(CurrentTimeString());
+      last_recovery_time_metric_->SetValue(CurrentTimeString());
     }
 
     SleepForMs(SLEEP_INTERVAL_MS);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d0a4851..b135e38 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -236,13 +236,12 @@ Statestore::Statestore(MetricGroup* metrics)
         FLAGS_statestore_max_missed_heartbeats / 2)) {
 
   DCHECK(metrics != NULL);
-  num_subscribers_metric_ =
-      metrics->AddGauge<int64_t>(STATESTORE_LIVE_SUBSCRIBERS, 0);
+  num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
   subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
       STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
-  key_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
-  value_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
-  topic_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
+  key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
+  value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
+  topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
 
   topic_update_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
@@ -398,7 +397,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     subscribers_.insert(make_pair(subscriber_id, current_registration));
     failure_detector_->UpdateHeartbeat(
         PrintId(current_registration->registration_id()), true);
-    num_subscribers_metric_->set_value(subscribers_.size());
+    num_subscribers_metric_->SetValue(subscribers_.size());
     subscriber_set_metric_->Add(subscriber_id);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/common-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/common-metrics.cc b/be/src/util/common-metrics.cc
index d147862..114e0e0 100644
--- a/be/src/util/common-metrics.cc
+++ b/be/src/util/common-metrics.cc
@@ -33,7 +33,7 @@ void CommonMetrics::InitCommonMetrics(MetricGroup* metric_group) {
   KUDU_CLIENT_VERSION = metric_group->AddProperty<string>(
       KUDU_CLIENT_VERSION_METRIC_NAME, kudu::client::GetShortVersionString());
 
-  PROCESS_START_TIME->set_value(CurrentTimeString());
+  PROCESS_START_TIME->SetValue(CurrentTimeString());
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 88d23f1..10966b4 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -211,7 +211,7 @@ void RootHandler(const Webserver::ArgumentMap& args, Document* document) {
     document->GetAllocator());
 
   if (CommonMetrics::PROCESS_START_TIME != nullptr) {
-    Value process_start_time(CommonMetrics::PROCESS_START_TIME->value().c_str(),
+    Value process_start_time(CommonMetrics::PROCESS_START_TIME->GetValue().c_str(),
       document->GetAllocator());
     document->AddMember("process_start_time", process_start_time,
       document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 1325f2e..18e96a8 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -162,70 +162,70 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   IMPALA_SERVER_READY = m->AddProperty<bool>(
       ImpaladMetricKeys::IMPALA_SERVER_READY, false);
 
-  IMPALA_SERVER_NUM_QUERIES = m->AddCounter<int64_t>(
+  IMPALA_SERVER_NUM_QUERIES = m->AddCounter(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_QUERIES, 0);
-  NUM_QUERIES_REGISTERED = m->AddGauge<int64_t>(ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
-  NUM_QUERIES_EXPIRED = m->AddCounter<int64_t>(
+  NUM_QUERIES_REGISTERED = m->AddGauge(
+      ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
+  NUM_QUERIES_EXPIRED = m->AddCounter(
       ImpaladMetricKeys::NUM_QUERIES_EXPIRED, 0);
-  NUM_QUERIES_SPILLED = m->AddCounter<int64_t>(
+  NUM_QUERIES_SPILLED = m->AddCounter(
       ImpaladMetricKeys::NUM_QUERIES_SPILLED, 0);
-  IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter<int64_t>(
+  IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS, 0);
   IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT, 0L);
-  IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge<int64_t>(
+  IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS, 0);
-  IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge<int64_t>(
+  IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS, 0);
-  NUM_SESSIONS_EXPIRED = m->AddCounter<int64_t>(
+  NUM_SESSIONS_EXPIRED = m->AddCounter(
       ImpaladMetricKeys::NUM_SESSIONS_EXPIRED, 0);
-  RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge<int64_t>(
+  RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge(
       ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_NUM_ROWS, 0);
-  RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge<int64_t>(
+  RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_BYTES, 0);
 
   // Initialize scan node metrics
-  NUM_RANGES_PROCESSED = m->AddCounter<int64_t>(
+  NUM_RANGES_PROCESSED = m->AddCounter(
       ImpaladMetricKeys::TOTAL_SCAN_RANGES_PROCESSED, 0);
-  NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter<int64_t>(
+  NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter(
       ImpaladMetricKeys::NUM_SCAN_RANGES_MISSING_VOLUME_ID, 0);
 
   // Initialize memory usage metrics
-  MEM_POOL_TOTAL_BYTES = m->AddGauge<int64_t>(
+  MEM_POOL_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::MEM_POOL_TOTAL_BYTES, 0);
-  HASH_TABLE_TOTAL_BYTES = m->AddGauge<int64_t>(
+  HASH_TABLE_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES, 0);
 
   // Initialize insert metrics
-  NUM_FILES_OPEN_FOR_INSERT = m->AddGauge<int64_t>(
+  NUM_FILES_OPEN_FOR_INSERT = m->AddGauge(
       ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
 
   // Initialize IO mgr metrics
-  IO_MGR_NUM_OPEN_FILES = m->AddGauge<int64_t>(
-      ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
-  IO_MGR_NUM_BUFFERS = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
-  IO_MGR_TOTAL_BYTES = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
-  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge<int64_t>(
+  IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
+  IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
+  IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
+  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
-  IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge<int64_t>(
+  IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
-  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge<int64_t>(
+  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge<int64_t>(
+  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge<int64_t>(
+  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT, 0);
 
-  IO_MGR_BYTES_READ = m->AddCounter<int64_t>(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
-  IO_MGR_LOCAL_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_BYTES_READ = m->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
+  IO_MGR_LOCAL_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
-  IO_MGR_CACHED_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_CACHED_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
-  IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
-  IO_MGR_BYTES_WRITTEN = m->AddCounter<int64_t>(
+  IO_MGR_BYTES_WRITTEN = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0);
 
   IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
@@ -233,8 +233,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO);
 
   // Initialize catalog metrics
-  CATALOG_NUM_DBS = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
-  CATALOG_NUM_TABLES = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
+  CATALOG_NUM_DBS = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
+  CATALOG_NUM_TABLES = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
   CATALOG_READY = m->AddProperty<bool>(ImpaladMetricKeys::CATALOG_READY, false);
 
   // Maximum duration to be tracked by the query durations metric. No particular reasoning
@@ -248,8 +248,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3));
 
   // Initialize Hedged read metrics
-  HEDGED_READ_OPS = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
-  HEDGED_READ_OPS_WIN = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
+  HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
+  HEDGED_READ_OPS_WIN = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 3308bf4..fd78343 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -32,7 +32,7 @@ using namespace strings;
 
 DECLARE_bool(mmap_buffers);
 
-SumGauge<int64_t>* AggregateMemoryMetrics::TOTAL_USED = nullptr;
+SumGauge* AggregateMemoryMetrics::TOTAL_USED = nullptr;
 IntGauge* AggregateMemoryMetrics::NUM_MAPS = nullptr;
 IntGauge* AggregateMemoryMetrics::MAPPED_BYTES = nullptr;
 IntGauge* AggregateMemoryMetrics::RSS = nullptr;
@@ -110,19 +110,19 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
 #endif
   MetricGroup* aggregate_metrics = metrics->GetOrCreateChildGroup("memory");
   AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
-      new SumGauge<int64_t>(MetricDefs::Get("memory.total-used"), used_metrics));
+      new SumGauge(MetricDefs::Get("memory.total-used"), used_metrics));
   if (register_jvm_metrics) {
     RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
   }
 
   if (MemInfo::HaveSmaps()) {
     AggregateMemoryMetrics::NUM_MAPS =
-        aggregate_metrics->AddGauge<int64_t>("memory.num-maps", 0U);
+        aggregate_metrics->AddGauge("memory.num-maps", 0U);
     AggregateMemoryMetrics::MAPPED_BYTES =
-        aggregate_metrics->AddGauge<int64_t>("memory.mapped-bytes", 0U);
-    AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge<int64_t>("memory.rss", 0U);
+        aggregate_metrics->AddGauge("memory.mapped-bytes", 0U);
+    AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge("memory.rss", 0U);
     AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES =
-        aggregate_metrics->AddGauge<int64_t>("memory.anon-huge-page-bytes", 0U);
+        aggregate_metrics->AddGauge("memory.anon-huge-page-bytes", 0U);
   }
   ThpConfig thp_config = MemInfo::ParseThpConfig();
   AggregateMemoryMetrics::THP_ENABLED =
@@ -139,16 +139,16 @@ void AggregateMemoryMetrics::Refresh() {
   if (NUM_MAPS != nullptr) {
     // Only call ParseSmaps() if the metrics were created.
     MappedMemInfo map_info = MemInfo::ParseSmaps();
-    NUM_MAPS->set_value(map_info.num_maps);
-    MAPPED_BYTES->set_value(map_info.size_kb * 1024);
-    RSS->set_value(map_info.rss_kb * 1024);
-    ANON_HUGE_PAGE_BYTES->set_value(map_info.anon_huge_pages_kb * 1024);
+    NUM_MAPS->SetValue(map_info.num_maps);
+    MAPPED_BYTES->SetValue(map_info.size_kb * 1024);
+    RSS->SetValue(map_info.rss_kb * 1024);
+    ANON_HUGE_PAGE_BYTES->SetValue(map_info.anon_huge_pages_kb * 1024);
   }
 
   ThpConfig thp_config = MemInfo::ParseThpConfig();
-  THP_ENABLED->set_value(thp_config.enabled);
-  THP_DEFRAG->set_value(thp_config.defrag);
-  THP_KHUGEPAGED_DEFRAG->set_value(thp_config.khugepaged_defrag);
+  THP_ENABLED->SetValue(thp_config.enabled);
+  THP_DEFRAG->SetValue(thp_config.defrag);
+  THP_KHUGEPAGED_DEFRAG->SetValue(thp_config.khugepaged_defrag);
 }
 
 JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
@@ -192,35 +192,36 @@ Status JvmMetric::InitMetrics(MetricGroup* metrics) {
   return Status::OK();
 }
 
-void JvmMetric::CalculateValue() {
+int64_t JvmMetric::GetValue() {
   TGetJvmMetricsRequest request;
   request.get_all = false;
   request.__set_memory_pool(mempool_name_);
   TGetJvmMetricsResponse response;
-  if (!JniUtil::GetJvmMetrics(request, &response).ok()) return;
-  if (response.memory_pools.size() != 1) return;
+  if (!JniUtil::GetJvmMetrics(request, &response).ok()) return 0;
+  if (response.memory_pools.size() != 1) return 0;
   TJvmMemoryPool& pool = response.memory_pools[0];
   DCHECK(pool.name == mempool_name_);
   switch (metric_type_) {
-    case MAX: value_ = pool.max;
-      return;
-    case INIT: value_ = pool.init;
-      return;
-    case CURRENT: value_ = pool.used;
-      return;
-    case COMMITTED: value_ = pool.committed;
-      return;
-    case PEAK_MAX: value_ = pool.peak_max;
-      return;
-    case PEAK_INIT: value_ = pool.peak_init;
-      return;
-    case PEAK_CURRENT: value_ = pool.peak_used;
-      return;
-    case PEAK_COMMITTED: value_ = pool.peak_committed;
-      return;
+    case MAX:
+      return pool.max;
+    case INIT:
+      return pool.init;
+    case CURRENT:
+      return pool.used;
+    case COMMITTED:
+      return pool.committed;
+    case PEAK_MAX:
+      return pool.peak_max;
+    case PEAK_INIT:
+      return pool.peak_init;
+    case PEAK_CURRENT:
+      return pool.peak_used;
+    case PEAK_COMMITTED:
+      return pool.peak_committed;
     default:
       DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
   }
+  return 0;
 }
 
 Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
@@ -263,47 +264,39 @@ BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType t
     global_reservations_(global_reservations),
     buffer_pool_(buffer_pool) {}
 
-void BufferPoolMetric::CalculateValue() {
+int64_t BufferPoolMetric::GetValue() {
   // IMPALA-6362: we have to be careful that none of the below calls to ReservationTracker
   // methods acquire ReservationTracker::lock_ to avoid a potential circular dependency
   // with MemTracker::child_trackers_lock_, which may be held when refreshing MemTracker
   // consumption.
   switch (type_) {
     case BufferPoolMetricType::LIMIT:
-      value_ = buffer_pool_->GetSystemBytesLimit();
-      break;
+      return buffer_pool_->GetSystemBytesLimit();
     case BufferPoolMetricType::SYSTEM_ALLOCATED:
-      value_ = buffer_pool_->GetSystemBytesAllocated();
-      break;
+      return buffer_pool_->GetSystemBytesAllocated();
     case BufferPoolMetricType::RESERVED:
-      value_ = global_reservations_->GetReservation();
-      break;
+      return global_reservations_->GetReservation();
     case BufferPoolMetricType::UNUSED_RESERVATION_BYTES: {
       // Estimate the unused reservation based on other aggregate values, defined as
       // the total bytes of reservation where there is no corresponding buffer in use
       // by a client. Buffers are either in-use, free buffers, or attached to clean pages.
       int64_t total_used_reservation = buffer_pool_->GetSystemBytesAllocated()
-        - buffer_pool_->GetFreeBufferBytes()
-        - buffer_pool_->GetCleanPageBytes();
-      value_ = global_reservations_->GetReservation() - total_used_reservation;
-      break;
+          - buffer_pool_->GetFreeBufferBytes()
+          - buffer_pool_->GetCleanPageBytes();
+      return global_reservations_->GetReservation() - total_used_reservation;
     }
     case BufferPoolMetricType::NUM_FREE_BUFFERS:
-      value_ = buffer_pool_->GetNumFreeBuffers();
-      break;
+      return buffer_pool_->GetNumFreeBuffers();
     case BufferPoolMetricType::FREE_BUFFER_BYTES:
-      value_ = buffer_pool_->GetFreeBufferBytes();
-      break;
+      return buffer_pool_->GetFreeBufferBytes();
     case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
-      value_ = buffer_pool_->GetCleanPageBytesLimit();
-      break;
+      return buffer_pool_->GetCleanPageBytesLimit();
     case BufferPoolMetricType::NUM_CLEAN_PAGES:
-      value_ = buffer_pool_->GetNumCleanPages();
-      break;
+      return buffer_pool_->GetNumCleanPages();
     case BufferPoolMetricType::CLEAN_PAGE_BYTES:
-      value_ = buffer_pool_->GetCleanPageBytes();
-      break;
+      return buffer_pool_->GetCleanPageBytes();
     default:
       DCHECK(false) << "Unknown BufferPoolMetricType: " << static_cast<int>(type_);
   }
+  return 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 3294c30..6c10e09 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -44,7 +44,7 @@ class AggregateMemoryMetrics {
   /// including JVM memory), which is either in use by queries or cached by the BufferPool
   /// or the malloc implementation.
   /// TODO: IMPALA-691 - consider changing this to include JVM memory.
-  static SumGauge<int64_t>* TOTAL_USED;
+  static SumGauge* TOTAL_USED;
 
   /// The total number of virtual memory regions for the process.
   /// The value must be refreshed by calling Refresh().
@@ -106,9 +106,8 @@ class TcmallocMetric : public IntGauge {
    public:
     PhysicalBytesMetric(const TMetricDef& def) : IntGauge(def, 0) { }
 
-   private:
-    virtual void CalculateValue() {
-      value_ = TOTAL_BYTES_RESERVED->value() - PAGEHEAP_UNMAPPED_BYTES->value();
+    virtual int64_t GetValue() override {
+      return TOTAL_BYTES_RESERVED->GetValue() - PAGEHEAP_UNMAPPED_BYTES->GetValue();
     }
   };
 
@@ -117,20 +116,21 @@ class TcmallocMetric : public IntGauge {
   static TcmallocMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
       const std::string& tcmalloc_var);
 
+  virtual int64_t GetValue() override {
+    int64_t retval = 0;
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+    MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
+        reinterpret_cast<size_t*>(&retval));
+#endif
+    return retval;
+  }
+
  private:
   /// Name of the tcmalloc property this metric should fetch.
   const std::string tcmalloc_var_;
 
   TcmallocMetric(const TMetricDef& def, const std::string& tcmalloc_var)
-      : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
-
-  virtual void CalculateValue() {
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
-    DCHECK_EQ(sizeof(size_t), sizeof(value_));
-    MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
-        reinterpret_cast<size_t*>(&value_));
-#endif
-  }
+    : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
 };
 
 /// Alternative to TCMallocMetric if we're running under a sanitizer that replaces
@@ -138,12 +138,16 @@ class TcmallocMetric : public IntGauge {
 class SanitizerMallocMetric : public IntGauge {
  public:
   SanitizerMallocMetric(const TMetricDef& def) : IntGauge(def, 0) {}
+
   static SanitizerMallocMetric* BYTES_ALLOCATED;
- private:
-  virtual void CalculateValue() override {
+
+  virtual int64_t GetValue() override {
 #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
-    value_ = __sanitizer_get_current_allocated_bytes();
+    return __sanitizer_get_current_allocated_bytes();
+#else
+    return 0;
 #endif
+
   }
 };
 
@@ -157,10 +161,9 @@ class JvmMetric : public IntGauge {
   /// pool (usually ~5 pools plus a synthetic 'total' pool).
   static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
 
- protected:
   /// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
   /// right value from that structure according to metric_type_.
-  virtual void CalculateValue();
+  virtual int64_t GetValue() override;
 
  private:
   /// Each names one of the fields in TJvmMemoryPool.
@@ -206,8 +209,7 @@ class BufferPoolMetric : public IntGauge {
   static BufferPoolMetric* NUM_CLEAN_PAGES;
   static BufferPoolMetric* CLEAN_PAGE_BYTES;
 
- protected:
-  virtual void CalculateValue();
+  virtual int64_t GetValue() override;
 
  private:
   friend class ReservationTrackerTest;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 0126281..bfbfdfe 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -36,7 +36,7 @@ namespace impala {
 template <typename M, typename T>
 void AssertValue(M* metric, const T& value,
     const string& human_readable) {
-  EXPECT_EQ(metric->value(), value);
+  EXPECT_EQ(metric->GetValue(), value);
   if (!human_readable.empty()) {
     EXPECT_EQ(metric->ToHumanReadable(), human_readable);
   }
@@ -73,36 +73,36 @@ class MetricsTest : public testing::Test {
 TEST_F(MetricsTest, CounterMetrics) {
   MetricGroup metrics("CounterMetrics");
   AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT);
-  IntCounter* int_counter = metrics.AddCounter<int64_t>("counter", 0);
+  IntCounter* int_counter = metrics.AddCounter("counter", 0);
   AssertValue(int_counter, 0, "0");
   int_counter->Increment(1);
   AssertValue(int_counter, 1, "1");
   int_counter->Increment(10);
   AssertValue(int_counter, 11, "11");
-  int_counter->set_value(3456);
+  int_counter->SetValue(3456);
   AssertValue(int_counter, 3456, "3.46K");
 
   AddMetricDef("counter_with_units", TMetricKind::COUNTER, TUnit::BYTES);
   IntCounter* int_counter_with_units =
-      metrics.AddCounter<int64_t>("counter_with_units", 10);
+      metrics.AddCounter("counter_with_units", 10);
   AssertValue(int_counter_with_units, 10, "10.00 B");
 }
 
 TEST_F(MetricsTest, GaugeMetrics) {
   MetricGroup metrics("GaugeMetrics");
   AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
-  IntGauge* int_gauge = metrics.AddGauge<int64_t>("gauge", 0);
+  IntGauge* int_gauge = metrics.AddGauge("gauge", 0);
   AssertValue(int_gauge, 0, "0");
   int_gauge->Increment(-1);
   AssertValue(int_gauge, -1, "-1");
   int_gauge->Increment(10);
   AssertValue(int_gauge, 9, "9");
-  int_gauge->set_value(3456);
+  int_gauge->SetValue(3456);
   AssertValue(int_gauge, 3456, "3456");
 
   AddMetricDef("gauge_with_units", TMetricKind::GAUGE, TUnit::TIME_S);
   IntGauge* int_gauge_with_units =
-      metrics.AddGauge<int64_t>("gauge_with_units", 10);
+      metrics.AddGauge("gauge_with_units", 10);
   AssertValue(int_gauge_with_units, 10, "10s000ms");
 }
 
@@ -111,12 +111,12 @@ TEST_F(MetricsTest, SumGauge) {
   AddMetricDef("gauge1", TMetricKind::GAUGE, TUnit::NONE);
   AddMetricDef("gauge2", TMetricKind::GAUGE, TUnit::NONE);
   AddMetricDef("sum", TMetricKind::GAUGE, TUnit::NONE);
-  IntGauge* gauge1 = metrics.AddGauge<int64_t>("gauge1", 0);
-  IntGauge* gauge2 = metrics.AddGauge<int64_t>("gauge2", 0);
+  IntGauge* gauge1 = metrics.AddGauge("gauge1", 0);
+  IntGauge* gauge2 = metrics.AddGauge("gauge2", 0);
 
   vector<IntGauge*> gauges({gauge1, gauge2});
   IntGauge* sum_gauge =
-      metrics.RegisterMetric(new SumGauge<int64_t>(MetricDefs::Get("sum"), gauges));
+      metrics.RegisterMetric(new SumGauge(MetricDefs::Get("sum"), gauges));
 
   AssertValue(sum_gauge, 0, "0");
   gauge1->Increment(1);
@@ -132,14 +132,14 @@ TEST_F(MetricsTest, PropertyMetrics) {
   AddMetricDef("bool_property", TMetricKind::PROPERTY, TUnit::NONE);
   BooleanProperty* bool_property = metrics.AddProperty("bool_property", false);
   AssertValue(bool_property, false, "false");
-  bool_property->set_value(true);
+  bool_property->SetValue(true);
   AssertValue(bool_property, true, "true");
 
   AddMetricDef("string_property", TMetricKind::PROPERTY, TUnit::NONE);
   StringProperty* string_property = metrics.AddProperty("string_property",
       string("string1"));
   AssertValue(string_property, "string1", "string1");
-  string_property->set_value("string2");
+  string_property->SetValue("string2");
   AssertValue(string_property, "string2", "string2");
 }
 
@@ -147,11 +147,11 @@ TEST_F(MetricsTest, NonFiniteValues) {
   MetricGroup metrics("NanValues");
   AddMetricDef("inf_value", TMetricKind::GAUGE, TUnit::NONE);
   double inf = numeric_limits<double>::infinity();
-  DoubleGauge* gauge = metrics.AddGauge("inf_value", inf);
+  DoubleGauge* gauge = metrics.AddDoubleGauge("inf_value", inf);
   AssertValue(gauge, inf, "inf");
   double nan = numeric_limits<double>::quiet_NaN();
-  gauge->set_value(nan);
-  EXPECT_TRUE(std::isnan(gauge->value()));
+  gauge->SetValue(nan);
+  EXPECT_TRUE(std::isnan(gauge->GetValue()));
   EXPECT_TRUE(gauge->ToHumanReadable() == "nan");
 }
 
@@ -223,19 +223,19 @@ TEST_F(MetricsTest, MemMetric) {
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.bytes-in-use");
   ASSERT_TRUE(bytes_in_use != NULL);
 
-  uint64_t cur_in_use = bytes_in_use->value();
+  uint64_t cur_in_use = bytes_in_use->GetValue();
   EXPECT_GT(cur_in_use, 0);
 
   // Allocate 100MB to increase the number of bytes used. TCMalloc may also give up some
   // bytes during this allocation, so this allocation is deliberately large to ensure that
   // the bytes used metric goes up net.
   scoped_ptr<vector<uint64_t>> chunk(new vector<uint64_t>(100 * 1024 * 1024));
-  EXPECT_GT(bytes_in_use->value(), cur_in_use);
+  EXPECT_GT(bytes_in_use->GetValue(), cur_in_use);
 
   IntGauge* total_bytes_reserved =
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.total-bytes-reserved");
   ASSERT_TRUE(total_bytes_reserved != NULL);
-  ASSERT_GT(total_bytes_reserved->value(), 0);
+  ASSERT_GT(total_bytes_reserved->GetValue(), 0);
 
   IntGauge* pageheap_free_bytes =
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.pageheap-free-bytes");
@@ -254,12 +254,12 @@ TEST_F(MetricsTest, JvmMetrics) {
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
           "jvm.total.current-usage-bytes");
   ASSERT_TRUE(jvm_total_used != NULL);
-  EXPECT_GT(jvm_total_used->value(), 0);
+  EXPECT_GT(jvm_total_used->GetValue(), 0);
   IntGauge* jvm_peak_total_used =
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
           "jvm.total.peak-current-usage-bytes");
   ASSERT_TRUE(jvm_peak_total_used != NULL);
-  EXPECT_GT(jvm_peak_total_used->value(), 0);
+  EXPECT_GT(jvm_peak_total_used->GetValue(), 0);
 }
 
 void AssertJson(const Value& val, const string& name, const string& value,
@@ -274,7 +274,7 @@ void AssertJson(const Value& val, const string& name, const string& value,
 TEST_F(MetricsTest, CountersJson) {
   MetricGroup metrics("CounterMetrics");
   AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT, "description");
-  metrics.AddCounter<int64_t>("counter", 0);
+  metrics.AddCounter("counter", 0);
   Document document;
   Value val;
   metrics.ToJson(true, &document, &val);
@@ -286,7 +286,7 @@ TEST_F(MetricsTest, CountersJson) {
 TEST_F(MetricsTest, GaugesJson) {
   MetricGroup metrics("GaugeMetrics");
   AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
-  metrics.AddGauge<int64_t>("gauge", 10);
+  metrics.AddGauge("gauge", 10);
   Document document;
   Value val;
   metrics.ToJson(true, &document, &val);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 12d6df3..b513c1e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -27,6 +27,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 
+#include "common/atomic.h"
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "common/status.h"
@@ -118,59 +119,37 @@ class Metric {
   void AddStandardFields(rapidjson::Document* document, rapidjson::Value* val);
 };
 
-/// A SimpleMetric has a value which is a simple primitive type: e.g. integers, strings and
-/// floats. It is parameterised not only by the type of its value, but by both the unit
-/// (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself. The kind
-/// can be one of: 'gauge', which may increase or decrease over time, a 'counter' which is
-/// increasing only over time, or a 'property' which is not numeric.
-//
-/// SimpleMetrics return their current value through the value() method. Access to value()
-/// is thread-safe.
-//
-/// TODO: We can use type traits to select a more efficient lock-free implementation of
-/// value() etc. where it is safe to do so.
-/// TODO: CalculateValue() can be returning a value, its current interface is not clean.
-template<typename T, TMetricKind::type metric_kind=TMetricKind::GAUGE>
-class SimpleMetric : public Metric {
+/// A ScalarMetric has a value which is a simple primitive type: e.g. integers, strings
+/// and floats. It is parameterised not only by the type of its value, but by both the
+/// unit (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself.
+/// The kind can be one of:
+/// - 'gauge', which may increase or decrease over time,
+/// - 'counter' which can only increase over time
+/// - 'property' which is a value store which can be read and written only
+///
+/// Note that management software may use the metric kind as hint on how to display
+/// the value. ScalarMetrics return their current value through the GetValue() method
+/// and set/initialize the value with SetValue(). Both methods are thread safe.
+template<typename T, TMetricKind::type metric_kind_t>
+class ScalarMetric: public Metric {
  public:
-  SimpleMetric(const TMetricDef& metric_def, const T& initial_value)
-      : Metric(metric_def), unit_(metric_def.units), value_(initial_value) {
-    DCHECK_EQ(metric_kind, metric_def.kind) << "Metric kind does not match definition: "
+  ScalarMetric(const TMetricDef& metric_def)
+    : Metric(metric_def), unit_(metric_def.units) {
+    DCHECK_EQ(metric_kind_t, metric_def.kind) << "Metric kind does not match definition: "
         << metric_def.key;
   }
 
-  virtual ~SimpleMetric() { }
-
-  /// Returns the current value, updating it if necessary. Thread-safe.
-  T value() {
-    boost::lock_guard<SpinLock> l(lock_);
-    CalculateValue();
-    return value_;
-  }
-
-  /// Sets the current value. Thread-safe.
-  void set_value(const T& value) {
-    boost::lock_guard<SpinLock> l(lock_);
-    value_ = value;
-  }
+  virtual ~ScalarMetric() { }
 
-  /// Adds 'delta' to the current value atomically.
-  void Increment(const T& delta) {
-    DCHECK(kind() != TMetricKind::PROPERTY)
-        << "Can't change value of PROPERTY metric: " << key();
-    DCHECK(kind() != TMetricKind::COUNTER || delta >= 0)
-        << "Can't decrement value of COUNTER metric: " << key();
-    if (delta == 0) return;
-    boost::lock_guard<SpinLock> l(lock_);
-    value_ += delta;
-  }
+  /// Returns the current value. Thread-safe.
+  virtual T GetValue() = 0;
 
-  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) {
+  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) override {
     rapidjson::Value container(rapidjson::kObjectType);
     AddStandardFields(document, &container);
 
     rapidjson::Value metric_value;
-    ToJsonValue(value(), TUnit::NONE, document, &metric_value);
+    ToJsonValue(GetValue(), TUnit::NONE, document, &metric_value);
     container.AddMember("value", metric_value, document->GetAllocator());
 
     rapidjson::Value type_value(PrintTMetricKind(kind()).c_str(),
@@ -181,30 +160,46 @@ class SimpleMetric : public Metric {
     *val = container;
   }
 
-  virtual std::string ToHumanReadable() {
-    return PrettyPrinter::Print(value(), unit());
+  virtual std::string ToHumanReadable() override {
+    return PrettyPrinter::Print(GetValue(), unit());
   }
 
-  virtual void ToLegacyJson(rapidjson::Document* document) {
+  virtual void ToLegacyJson(rapidjson::Document* document) override {
     rapidjson::Value val;
-    ToJsonValue(value(), TUnit::NONE, document, &val);
+    ToJsonValue(GetValue(), TUnit::NONE, document, &val);
     document->AddMember(key_.c_str(), val, document->GetAllocator());
   }
 
   TUnit::type unit() const { return unit_; }
-  TMetricKind::type kind() const { return metric_kind; }
+  TMetricKind::type kind() const { return metric_kind_t; }
 
  protected:
-  /// Called to compute value_ if necessary during calls to value(). The more natural
-  /// approach would be to have virtual T value(), but that's not possible in C++.
-  //
-  /// TODO: Should be cheap to have a blank implementation, but if required we can cause
-  /// the compiler to avoid calling this entirely through a compile-time constant.
-  virtual void CalculateValue() { }
-
   /// Units of this metric.
   const TUnit::type unit_;
+};
 
+/// An implementation of scalar metric with spinlock.
+template<typename T, TMetricKind::type metric_kind_t>
+class LockedMetric : public ScalarMetric<T, metric_kind_t> {
+ public:
+  LockedMetric(const TMetricDef& metric_def, const T& initial_value)
+    : ScalarMetric<T, metric_kind_t>(metric_def), value_(initial_value) {}
+
+  virtual ~LockedMetric() {}
+
+  /// Atomically reads the current value.
+  virtual T GetValue() override {
+    boost::lock_guard<SpinLock> l(lock_);
+    return value_;
+  }
+
+  /// Atomically sets the value.
+  void SetValue(const T& value) {
+    boost::lock_guard<SpinLock> l(lock_);
+    value_ = value;
+  }
+
+ protected:
   /// Guards access to value_.
   SpinLock lock_;
 
@@ -212,42 +207,81 @@ class SimpleMetric : public Metric {
   T value_;
 };
 
-// Gauge metric that computes the sum of several gauges.
-template <typename T>
-class SumGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+typedef class LockedMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
+typedef class LockedMetric<std::string,TMetricKind::PROPERTY> StringProperty;
+typedef class LockedMetric<double, TMetricKind::GAUGE> DoubleGauge;
+
+/// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented
+/// atomically via the Increment() interface.
+template<TMetricKind::type metric_kind_t>
+class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
  public:
-  SumGauge(const TMetricDef& metric_def,
-      const std::vector<SimpleMetric<T, TMetricKind::GAUGE>*>& metrics)
-    : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metrics_(metrics) {}
+  AtomicMetric(const TMetricDef& metric_def, const int64_t initial_value)
+    : ScalarMetric<int64_t, metric_kind_t>(metric_def), value_(initial_value) {
+    DCHECK(metric_kind_t == TMetricKind::GAUGE || metric_kind_t == TMetricKind::COUNTER);
+  }
+
+  virtual ~AtomicMetric() {}
+
+  /// Atomically reads the current value. May be overridden by derived classes.
+  /// The default implementation just atomically loads 'value_'. Derived classes
+  /// which derive the return value from mutliple sources other than 'value_'
+  /// need to take care of synchronization among sources.
+  virtual int64_t GetValue() override { return value_.Load(); }
+
+  /// Atomically sets the value.
+  void SetValue(const int64_t& value) { value_.Store(value); }
+
+  /// Adds 'delta' to the current value atomically.
+  void Increment(int64_t delta) {
+    DCHECK(metric_kind_t != TMetricKind::COUNTER || delta >= 0)
+        << "Can't decrement value of COUNTER metric: " << this->key();
+    value_.Add(delta);
+  }
+
+ protected:
+  /// The current value of the metric.
+  AtomicInt64 value_;
+};
+
+/// We write 'Int' as a placeholder for all integer types.
+typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
+typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
+
+/// Gauge metric that computes the sum of several gauges.
+class SumGauge : public IntGauge {
+ public:
+  SumGauge(const TMetricDef& metric_def, const std::vector<IntGauge*>& gauges)
+    : IntGauge(metric_def, 0), gauges_(gauges) {}
+
   virtual ~SumGauge() {}
 
- private:
-  virtual void CalculateValue() override {
-    T sum = 0;
-    for (SimpleMetric<T, TMetricKind::GAUGE>* metric : metrics_) sum += metric->value();
-    this->value_ = sum;
+  virtual int64_t GetValue() override {
+    // Note that this doesn't hold the locks of all gauages before computing the sum so
+    // it's possible for one of the gauages to change after being read and added to sum.
+    int64_t sum = 0;
+    for (auto gauge : gauges_) sum += gauge->GetValue();
+    return sum;
   }
 
-  /// The metrics to be summed.
-  std::vector<SimpleMetric<T, TMetricKind::GAUGE>*> metrics_;
+ private:
+  /// The gauges to be summed.
+  std::vector<IntGauge*> gauges_;
 };
 
-// Gauge metric that negates another gauge.
-template <typename T>
-class NegatedGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+/// Gauge metric that negates another gauge.
+class NegatedGauge : public IntGauge {
  public:
-  NegatedGauge(const TMetricDef& metric_def,
-      SimpleMetric<T, TMetricKind::GAUGE>* metric)
-    : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metric_(metric) {}
+  NegatedGauge(const TMetricDef& metric_def, IntGauge* gauge)
+    : IntGauge(metric_def, 0), gauge_(gauge) {}
+
   virtual ~NegatedGauge() {}
 
- private:
-  virtual void CalculateValue() override {
-    this->value_ = -metric_->value();
-  }
+  virtual int64_t GetValue() override { return -gauge_->GetValue(); }
 
+ private:
   /// The metric to be negated.
-  SimpleMetric<T, TMetricKind::GAUGE>* metric_;
+  IntGauge* gauge_;
 };
 
 /// Container for a set of metrics. A MetricGroup owns the memory for every metric
@@ -285,27 +319,28 @@ class MetricGroup {
   }
 
   /// Create a gauge metric object with given key and initial value (owned by this object)
-  template<typename T>
-  SimpleMetric<T>* AddGauge(const std::string& key, const T& value,
+  IntGauge* AddGauge(const std::string& key, const int64_t value,
       const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::GAUGE>(
-        MetricDefs::Get(key, metric_def_arg), value));
+    return RegisterMetric(new IntGauge(MetricDefs::Get(key, metric_def_arg), value));
   }
 
-  template<typename T>
-  SimpleMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
-      const T& value, const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::PROPERTY>(
-        MetricDefs::Get(key, metric_def_arg), value));
+  DoubleGauge* AddDoubleGauge(const std::string& key, const double value,
+      const std::string& metric_def_arg = "") {
+    return RegisterMetric(new DoubleGauge(MetricDefs::Get(key, metric_def_arg), value));
   }
 
   template<typename T>
-  SimpleMetric<T, TMetricKind::COUNTER>* AddCounter(const std::string& key,
+  LockedMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
       const T& value, const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::COUNTER>(
+    return RegisterMetric(new LockedMetric<T, TMetricKind::PROPERTY>(
         MetricDefs::Get(key, metric_def_arg), value));
   }
 
+  IntCounter* AddCounter(const std::string& key, const int64_t value,
+      const std::string& metric_def_arg = "") {
+    return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value));
+  }
+
   /// Returns a metric by key. All MetricGroups reachable from this group are searched in
   /// depth-first order, starting with the root group.  Returns NULL if there is no metric
   /// with that key. This is not a very cheap operation; the result should be cached where
@@ -380,13 +415,6 @@ class MetricGroup {
       rapidjson::Document* document);
 };
 
-/// We write 'Int' as a placeholder for all integer types.
-typedef class SimpleMetric<int64_t, TMetricKind::GAUGE> IntGauge;
-typedef class SimpleMetric<double, TMetricKind::GAUGE> DoubleGauge;
-typedef class SimpleMetric<int64_t, TMetricKind::COUNTER> IntCounter;
-
-typedef class SimpleMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
-typedef class SimpleMetric<std::string, TMetricKind::PROPERTY> StringProperty;
 
 /// Convenience method to instantiate a TMetricDef with a subset of its fields defined.
 /// Most externally-visible metrics should be defined in metrics.json and retrieved via

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 536119b..8397f35 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -194,10 +194,8 @@ Status ThreadMgr::StartInstrumentation(MetricGroup* metrics) {
   DCHECK(metrics != NULL);
   lock_guard<mutex> l(lock_);
   metrics_enabled_ = true;
-  total_threads_metric_ = metrics->AddGauge<int64_t>(
-      "thread-manager.total-threads-created", 0L);
-  current_num_threads_metric_ = metrics->AddGauge<int64_t>(
-      "thread-manager.running-threads", 0L);
+  total_threads_metric_ = metrics->AddGauge("thread-manager.total-threads-created", 0L);
+  current_num_threads_metric_ = metrics->AddGauge("thread-manager.running-threads", 0L);
   return Status::OK();
 }
 
@@ -224,7 +222,7 @@ void ThreadMgr::RemoveThread(const thread::id& boost_id, const string& category)
 void ThreadMgr::GetThreadOverview(Document* document) {
   lock_guard<mutex> l(lock_);
   if (metrics_enabled_) {
-    document->AddMember("total_threads", current_num_threads_metric_->value(),
+    document->AddMember("total_threads", current_num_threads_metric_->GetValue(),
         document->GetAllocator());
   }
   Value lst(kArrayType);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index dafe986..f493d33 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -882,7 +882,7 @@
       "IMPALAD"
     ],
     "label": "StateStore Subscriber Last Recovery Duration",
-    "units": "NONE",
+    "units": "TIME_S",
     "kind": "GAUGE",
     "key": "statestore-subscriber.last-recovery-duration"
   },