You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/01/18 23:55:52 UTC

[1/3] impala git commit: IMPALA-6382: Cap spillable buffer size and max row size query options

Repository: impala
Updated Branches:
  refs/heads/master ca7d03cfe -> e714f2b33


IMPALA-6382: Cap spillable buffer size and max row size query options

Currently the default and min spillable buffer size and max row size
query options accept any valid int64 value. Since the planner depends
on these values for memory estimations, if a very large value close to
the limits of int64 is set, the variables representing or relying on
these estimates can overflow during different phases of query execution.

This patch puts a reasonable upper limit of 1TB to these query options
to prevent such a situation.

Testing:
Added backend query option tests.

Change-Id: I36d3915f7019b13c3eb06f08bfdb38c71ec864f1
Reviewed-on: http://gerrit.cloudera.org:8080/9023
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/028a83e6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/028a83e6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/028a83e6

Branch: refs/heads/master
Commit: 028a83e6543a18dd3b9161226355f1e8d36c4ed7
Parents: ca7d03c
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Fri Jan 12 17:23:15 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 18 23:08:26 2018 +0000

----------------------------------------------------------------------
 be/src/service/query-options-test.cc            | 12 +++++++++-
 be/src/service/query-options.cc                 | 23 +++++++++++++++-----
 be/src/service/query-options.h                  |  3 +++
 .../functional-query/queries/QueryTest/set.test |  4 ++--
 4 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/028a83e6/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index efc5307..80c9866 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -140,7 +140,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(max_scan_range_length), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(rm_initial_mem),        {-1, I64_MAX}},
       {MAKE_OPTIONDEF(buffer_pool_limit),     {-1, I64_MAX}},
-      {MAKE_OPTIONDEF(max_row_size),          {1, I64_MAX}},
+      {MAKE_OPTIONDEF(max_row_size),          {1, ROW_SIZE_LIMIT}},
       {MAKE_OPTIONDEF(parquet_file_size),     {-1, I32_MAX}}
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32 {
@@ -302,8 +302,18 @@ TEST(QueryOptions, SetSpecialOptions) {
       TestOk("2MB", 2 * 1024 * 1024);
       TestOk("32G", 32ll * 1024 * 1024 * 1024);
       TestError("10MB");
+      TestOk(to_string(SPILLABLE_BUFFER_LIMIT).c_str(), SPILLABLE_BUFFER_LIMIT);
+      TestError(to_string(2 * SPILLABLE_BUFFER_LIMIT).c_str());
     }
   }
+  // MAX_ROW_SIZE should be between 1 and ROW_SIZE_LIMIT
+  {
+    OptionDef<int64_t> key_def = MAKE_OPTIONDEF(max_row_size);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestError("-1");
+    TestError("0");
+    TestError(to_string(ROW_SIZE_LIMIT + 1).c_str());
+  }
 }
 
 TEST(QueryOptions, ParseQueryOptions) {

http://git-wip-us.apache.org/repos/asf/impala/blob/028a83e6/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e8e8c7b..e3b5a1f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -552,7 +552,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
             ParseMemValue(value, "Default spillable buffer size", &buffer_size_bytes));
         if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
           return Status(
-              Substitute("Buffer size must be a power of two: $0", buffer_size_bytes));
+              Substitute("Default spillable buffer size must be a power of two: $0",
+                  buffer_size_bytes));
+        }
+        if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) {
+          return Status(Substitute(
+              "Default spillable buffer size must be less than or equal to: $0",
+              SPILLABLE_BUFFER_LIMIT));
         }
         query_options->__set_default_spillable_buffer_size(buffer_size_bytes);
         break;
@@ -563,7 +569,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
             ParseMemValue(value, "Minimum spillable buffer size", &buffer_size_bytes));
         if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
           return Status(
-              Substitute("Buffer size must be a power of two: $0", buffer_size_bytes));
+              Substitute("Minimum spillable buffer size must be a power of two: $0",
+                  buffer_size_bytes));
+        }
+        if (buffer_size_bytes > SPILLABLE_BUFFER_LIMIT) {
+          return Status(Substitute(
+              "Minimum spillable buffer size must be less than or equal to: $0",
+              SPILLABLE_BUFFER_LIMIT));
         }
         query_options->__set_min_spillable_buffer_size(buffer_size_bytes);
         break;
@@ -571,9 +583,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::MAX_ROW_SIZE: {
         int64_t max_row_size_bytes;
         RETURN_IF_ERROR(ParseMemValue(value, "Max row size", &max_row_size_bytes));
-        if (max_row_size_bytes <= 0) {
-          return Status(Substitute(
-              "Max row size must be a positive number of bytes: $0", value));
+        if (max_row_size_bytes <= 0 || max_row_size_bytes > ROW_SIZE_LIMIT) {
+          return Status(
+              Substitute("Invalid max row size of $0. Valid sizes are in [$1, $2]", value,
+                  1, ROW_SIZE_LIMIT));
         }
         query_options->__set_max_row_size(max_row_size_bytes);
         break;

http://git-wip-us.apache.org/repos/asf/impala/blob/028a83e6/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index b7065f4..be3607f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -132,6 +132,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR)\
   ;
 
+/// Enforce practical limits on some query options to avoid undesired query state.
+  static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
+  static const int64_t ROW_SIZE_LIMIT = 1LL << 40; // 1 TB
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.  Options that
 /// aren't set and lack defaults in common/thrift/ImpalaInternalService.thrift are

http://git-wip-us.apache.org/repos/asf/impala/blob/028a83e6/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 14ac629..32ad938 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -234,10 +234,10 @@ explain select count(distinct double_col) from functional.alltypesagg;
 ---- QUERY
 set max_row_size=-1;
 ---- CATCH
-Max row size must be a positive number of bytes: -1
+Invalid max row size of -1. Valid sizes are in [1, 1099511627776]
 ====
 ---- QUERY
 set max_row_size=0;
 ---- CATCH
-Max row size must be a positive number of bytes: 0
+Invalid max row size of 0. Valid sizes are in [1, 1099511627776]
 ====


[3/3] impala git commit: IMPALA-2397: Use atomics for IntGauge and IntCounter

Posted by kw...@apache.org.
IMPALA-2397: Use atomics for IntGauge and IntCounter

This change removes the spinlock in IntGauge and IntCounter
and uses AtomicInt64 instead. As shown in IMPALA-2397, multiple
threads can be contending for the spinlocks of some global metrics
under concurrent queries.

This change also breaks up SimpleMetric is renamed to ScalarMetric
and broken into two subclasses:
- LockedMetric:
  - a value store for any primitive type (int,float,string etc).
  - atomic read and write via GetValue() and SetValue() respectively.

- AtomicMetric:
  - the basis of IntGauge and IntCounter. Support atomic increment
    of the metric value via Increment() interface.
  - atomic read and write via GetValue() and SetValue() respectively.
  - only support int64_t type.

Change-Id: I48dfa5443cd771916b53541a0ffeaf1bcc7e7606
Reviewed-on: http://gerrit.cloudera.org:8080/9012
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e714f2b3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e714f2b3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e714f2b3

Branch: refs/heads/master
Commit: e714f2b33c5b64d5680dbc15e166759930f04560
Parents: b3d38b5
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jan 10 19:28:09 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 18 23:31:52 2018 +0000

----------------------------------------------------------------------
 be/src/exec/external-data-source-executor.cc |   8 +-
 be/src/rpc/TAcceptQueueServer.cpp            |   2 +-
 be/src/rpc/thrift-server.cc                  |   4 +-
 be/src/runtime/client-cache.cc               |   6 +-
 be/src/runtime/data-stream-mgr.cc            |   6 +-
 be/src/runtime/exec-env.cc                   |  10 +-
 be/src/runtime/io/scan-range.cc              |   4 +-
 be/src/runtime/krpc-data-stream-mgr.cc       |   6 +-
 be/src/runtime/mem-tracker-test.cc           |   4 +-
 be/src/runtime/mem-tracker.cc                |  10 +-
 be/src/runtime/mem-tracker.h                 |   4 +-
 be/src/runtime/query-exec-mgr.cc             |   2 +-
 be/src/runtime/query-state.cc                |   6 +-
 be/src/runtime/tmp-file-mgr-test.cc          |   2 +-
 be/src/runtime/tmp-file-mgr.cc               |   4 +-
 be/src/scheduling/admission-controller.cc    |  58 +++---
 be/src/scheduling/scheduler.cc               |   9 +-
 be/src/service/impala-server.cc              |  10 +-
 be/src/service/session-expiry-test.cc        |  12 +-
 be/src/statestore/statestore-subscriber.cc   |  14 +-
 be/src/statestore/statestore.cc              |  11 +-
 be/src/util/common-metrics.cc                |   2 +-
 be/src/util/default-path-handlers.cc         |   2 +-
 be/src/util/impalad-metrics.cc               |  66 +++----
 be/src/util/memory-metrics.cc                |  99 +++++-----
 be/src/util/memory-metrics.h                 |  42 ++--
 be/src/util/metrics-test.cc                  |  44 ++---
 be/src/util/metrics.h                        | 222 ++++++++++++----------
 be/src/util/thread.cc                        |   8 +-
 common/thrift/metrics.json                   |   2 +-
 30 files changed, 350 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c810c6..7c54f39 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -76,9 +76,9 @@ class ExternalDataSourceExecutor::JniState {
         "getNumClassCacheMisses", "()J");
     RETURN_ERROR_IF_EXC(env);
 
-    num_class_cache_hits_ = metrics->AddCounter<int64_t>(
+    num_class_cache_hits_ = metrics->AddCounter(
         "external-data-source.class-cache.hits", 0);
-    num_class_cache_misses_ = metrics->AddCounter<int64_t>(
+    num_class_cache_misses_ = metrics->AddCounter(
         "external-data-source.class-cache.misses", 0);
     return Status::OK();
   }
@@ -92,11 +92,11 @@ class ExternalDataSourceExecutor::JniState {
     int64_t num_cache_hits = env->CallStaticLongMethod(executor_class_,
         get_num_cache_hits_id_);
     RETURN_ERROR_IF_EXC(env);
-    num_class_cache_hits_->set_value(num_cache_hits);
+    num_class_cache_hits_->SetValue(num_cache_hits);
     int64_t num_cache_misses = env->CallStaticLongMethod(executor_class_,
         get_num_cache_misses_id_);
     RETURN_ERROR_IF_EXC(env);
-    num_class_cache_misses_->set_value(num_cache_misses);
+    num_class_cache_misses_->SetValue(num_cache_misses);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 8a398a2..5c1b1da 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -286,7 +286,7 @@ void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_pre
   DCHECK(metrics != NULL);
   stringstream queue_size_ss;
   queue_size_ss << key_prefix << ".connection-setup-queue-size";
-  queue_size_metric_ = metrics->AddGauge<int64_t>(queue_size_ss.str(), 0);
+  queue_size_metric_ = metrics->AddGauge(queue_size_ss.str(), 0);
   metrics_enabled_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ab51315..eaca699 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -342,10 +342,10 @@ ThriftServer::ThriftServer(const string& name,
     metrics_enabled_ = true;
     stringstream count_ss;
     count_ss << "impala.thrift-server." << name << ".connections-in-use";
-    num_current_connections_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+    num_current_connections_metric_ = metrics->AddGauge(count_ss.str(), 0);
     stringstream max_ss;
     max_ss << "impala.thrift-server." << name << ".total-connections";
-    total_connections_metric_ = metrics->AddCounter<int64_t>(max_ss.str(), 0);
+    total_connections_metric_ = metrics->AddCounter(max_ss.str(), 0);
     metrics_ = metrics;
   } else {
     metrics_enabled_ = false;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 8c0b6aa..af530f7 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -94,7 +94,7 @@ Status ClientCacheHelper::ReopenClient(ClientFactory factory_method,
     // CreateClient() will increment total_clients_metric_ if succeed.
     if (metrics_enabled_) {
       total_clients_metric_->Increment(-1);
-      DCHECK_GE(total_clients_metric_->value(), 0);
+      DCHECK_GE(total_clients_metric_->GetValue(), 0);
     }
     lock_guard<mutex> lock(client_map_lock_);
     client_map_.erase(client);
@@ -235,11 +235,11 @@ void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_pref
   lock_guard<mutex> lock(cache_lock_);
   stringstream count_ss;
   count_ss << key_prefix << ".client-cache.clients-in-use";
-  clients_in_use_metric_ = metrics->AddGauge<int64_t>(count_ss.str(), 0);
+  clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
 
   stringstream max_ss;
   max_ss << key_prefix << ".client-cache.total-clients";
-  total_clients_metric_ = metrics->AddGauge<int64_t>(max_ss.str(), 0);
+  total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
   metrics_enabled_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 93c524e..45eee7f 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -57,10 +57,10 @@ namespace impala {
 DataStreamMgr::DataStreamMgr(MetricGroup* metrics) {
   metrics_ = metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
-      metrics_->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+      metrics_->AddGauge("senders-blocked-on-recvr-creation", 0L);
   total_senders_waited_ =
-      metrics_->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
-  num_senders_timedout_ = metrics_->AddCounter<int64_t>(
+      metrics_->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = metrics_->AddCounter(
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 6d9a857..f191921 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -333,9 +333,9 @@ Status ExecEnv::Init() {
   // Also need a MemTracker for unused reservations as a negative value. Unused
   // reservations are counted against queries but not against the process memory
   // consumption. This accounts for that difference.
-  IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge<int64_t>(
-        MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
-        BufferPoolMetric::UNUSED_RESERVATION_BYTES));
+  IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge(
+      MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
+      BufferPoolMetric::UNUSED_RESERVATION_BYTES));
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -350,13 +350,13 @@ Status ExecEnv::Init() {
   // reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
   // (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
   // used by TCMalloc.
-  IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge<int64_t>(
+  IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
       MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
       TcmallocMetric::BYTES_IN_USE));
   vector<IntGauge*> overhead_metrics;
   overhead_metrics.push_back(negated_bytes_in_use);
   overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
-  SumGauge<int64_t>* tcmalloc_overhead = obj_pool_->Add(new SumGauge<int64_t>(
+  SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
       MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
       overhead_metrics));
   obj_pool_->Add(

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index dc14050..21daa96 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -335,8 +335,8 @@ void ScanRange::Close() {
       struct hdfsHedgedReadMetrics* hedged_metrics;
       int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
       if (success == 0) {
-        ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
-        ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
+        ImpaladMetrics::HEDGED_READ_OPS->SetValue(hedged_metrics->hedgedReadOps);
+        ImpaladMetrics::HEDGED_READ_OPS_WIN->SetValue(hedged_metrics->hedgedReadOpsWin);
         hdfsFreeHedgedReadMetrics(hedged_metrics);
       }
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 348b9ab..86955c8 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -63,10 +63,10 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
   MetricGroup* dsm_metrics = metrics->GetOrCreateChildGroup("datastream-manager");
   num_senders_waiting_ =
-      dsm_metrics->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+      dsm_metrics->AddGauge("senders-blocked-on-recvr-creation", 0L);
   total_senders_waited_ =
-      dsm_metrics->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
-  num_senders_timedout_ = dsm_metrics->AddCounter<int64_t>(
+      dsm_metrics->AddCounter("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = dsm_metrics->AddCounter(
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 4aaac05..faeb6a9 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -62,13 +62,13 @@ TEST(MemTestTest, ConsumptionMetric) {
   md.__set_units(TUnit::BYTES);
   md.__set_kind(TMetricKind::GAUGE);
   IntGauge metric(md, 0);
-  EXPECT_EQ(metric.value(), 0);
+  EXPECT_EQ(metric.GetValue(), 0);
 
   TMetricDef neg_md;
   neg_md.__set_key("neg_test");
   neg_md.__set_units(TUnit::BYTES);
   neg_md.__set_kind(TMetricKind::GAUGE);
-  NegatedGauge<int64_t> neg_metric(neg_md, &metric);
+  NegatedGauge neg_metric(neg_md, &metric);
 
   MemTracker t(&metric, 100, "");
   MemTracker neg_t(&neg_metric, 100, "");

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 98f45db..e5aa290 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -211,16 +211,16 @@ MemTracker::~MemTracker() {
 }
 
 void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
-  num_gcs_metric_ = metrics->AddCounter<int64_t>(Substitute("$0.num-gcs", prefix), 0);
+  num_gcs_metric_ = metrics->AddCounter(Substitute("$0.num-gcs", prefix), 0);
 
   // TODO: Consider a total amount of bytes freed counter
-  bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
+  bytes_freed_by_last_gc_metric_ = metrics->AddGauge(
       Substitute("$0.bytes-freed-by-last-gc", prefix), -1);
 
-  bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
+  bytes_over_limit_metric_ = metrics->AddGauge(
       Substitute("$0.bytes-over-limit", prefix), -1);
 
-  limit_metric_ = metrics->AddGauge<int64_t>(Substitute("$0.limit", prefix), limit_);
+  limit_metric_ = metrics->AddGauge(Substitute("$0.limit", prefix), limit_);
 }
 
 // Calling this on the query tracker results in output like:
@@ -430,7 +430,7 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
   }
 
   if (bytes_freed_by_last_gc_metric_ != NULL) {
-    bytes_freed_by_last_gc_metric_->set_value(pre_gc_consumption - curr_consumption);
+    bytes_freed_by_last_gc_metric_->SetValue(pre_gc_consumption - curr_consumption);
   }
   return curr_consumption > max_consumption;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index fb1cd90..c582d72 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -250,7 +250,7 @@ class MemTracker {
   bool LimitExceeded() {
     if (UNLIKELY(CheckLimitExceeded())) {
       if (bytes_over_limit_metric_ != NULL) {
-        bytes_over_limit_metric_->set_value(consumption() - limit_);
+        bytes_over_limit_metric_->SetValue(consumption() - limit_);
       }
       return GcMemory(limit_);
     }
@@ -274,7 +274,7 @@ class MemTracker {
   /// call if this tracker has a consumption metric.
   void RefreshConsumptionFromMetric() {
     DCHECK(consumption_metric_ != nullptr);
-    consumption_->Set(consumption_metric_->value());
+    consumption_->Set(consumption_metric_->GetValue());
   }
 
   int64_t limit() const { return limit_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 4f30f4e..316b712 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -123,7 +123,7 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // tcmalloc and address or thread sanitizer cannot be used together
   if (FLAGS_log_mem_usage_interval > 0) {
-    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
+    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
     if (num_complete % FLAGS_log_mem_usage_interval == 0) {
       char buf[2048];
       // This outputs how much memory is currently being used by this impalad

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 10c8033..259cd34 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -381,11 +381,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
       << " fragment_idx=" << fis->instance_ctx().fragment_idx
       << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
       << " coord_state_idx=" << rpc_params().coord_state_idx
-      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value();
+      << " #in-flight="
+      << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   Status status = fis->Exec();
   ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
   VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
-      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value()
+      << " #in-flight="
+      << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue()
       << " status=" << status;
   // initiate cancellation if nobody has done so yet
   if (!status.ok()) Cancel();

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index fbc0a36..3091c58 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -78,7 +78,7 @@ class TmpFileMgrTest : public ::testing::Test {
     vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
     IntGauge* active_metric =
         metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
-    EXPECT_EQ(active.size(), active_metric->value());
+    EXPECT_EQ(active.size(), active_metric->GetValue());
     SetMetric<string>* active_set_metric =
         metrics_->FindMetricForTesting<SetMetric<string>>(
         "tmp-file-mgr.active-scratch-dirs.list");

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 650af0b..d35d302 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -132,10 +132,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
 
   DCHECK(metrics != nullptr);
   num_active_scratch_dirs_metric_ =
-      metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
+      metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
   active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
       metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
-  num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
+  num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
     active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 99f659a..f43af2c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -482,9 +482,9 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
 }
 
 void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool_cfg) {
-  metrics_.pool_max_mem_resources->set_value(pool_cfg.max_mem_resources);
-  metrics_.pool_max_requests->set_value(pool_cfg.max_requests);
-  metrics_.pool_max_queued->set_value(pool_cfg.max_queued);
+  metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
+  metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
+  metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
 }
 
 Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
@@ -734,18 +734,18 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
 
   if (agg_num_running_ == num_running && agg_num_queued_ == num_queued &&
       agg_mem_reserved_ == mem_reserved) {
-    DCHECK_EQ(num_running, metrics_.agg_num_running->value());
-    DCHECK_EQ(num_queued, metrics_.agg_num_queued->value());
-    DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->value());
+    DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue());
+    DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue());
+    DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue());
     return;
   }
   VLOG_ROW << "Recomputed agg stats, previous: " << DebugString();
   agg_num_running_ = num_running;
   agg_num_queued_ = num_queued;
   agg_mem_reserved_ = mem_reserved;
-  metrics_.agg_num_running->set_value(num_running);
-  metrics_.agg_num_queued->set_value(num_queued);
-  metrics_.agg_mem_reserved->set_value(mem_reserved);
+  metrics_.agg_num_running->SetValue(num_running);
+  metrics_.agg_num_queued->SetValue(num_queued);
+  metrics_.agg_mem_reserved->SetValue(mem_reserved);
   VLOG_ROW << "Updated: " << DebugString();
 }
 
@@ -782,12 +782,12 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   if (current_reserved != local_stats_.backend_mem_reserved) {
     parent_->pools_for_updates_.insert(name_);
     local_stats_.backend_mem_reserved = current_reserved;
-    metrics_.local_backend_mem_reserved->set_value(current_reserved);
+    metrics_.local_backend_mem_reserved->SetValue(current_reserved);
   }
 
   const int64_t current_usage =
       tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
-  metrics_.local_backend_mem_usage->set_value(current_usage);
+  metrics_.local_backend_mem_usage->SetValue(current_usage);
 }
 
 void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
@@ -906,44 +906,44 @@ AdmissionController::GetPoolStats(const string& pool_name) {
 }
 
 void AdmissionController::PoolStats::InitMetrics() {
-  metrics_.total_admitted = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_admitted = parent_->metrics_group_->AddCounter(
       TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_queued = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_queued = parent_->metrics_group_->AddCounter(
       TOTAL_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_dequeued = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_dequeued = parent_->metrics_group_->AddCounter(
       TOTAL_DEQUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_rejected = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_rejected = parent_->metrics_group_->AddCounter(
       TOTAL_REJECTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_timed_out = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_timed_out = parent_->metrics_group_->AddCounter(
       TOTAL_TIMED_OUT_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.total_released = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.total_released = parent_->metrics_group_->AddCounter(
       TOTAL_RELEASED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter<int64_t>(
+  metrics_.time_in_queue_ms = parent_->metrics_group_->AddCounter(
       TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.agg_num_running = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_num_running = parent_->metrics_group_->AddGauge(
       AGG_NUM_RUNNING_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.agg_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_num_queued = parent_->metrics_group_->AddGauge(
       AGG_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.agg_mem_reserved = parent_->metrics_group_->AddGauge(
       AGG_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_mem_admitted = parent_->metrics_group_->AddGauge(
       LOCAL_MEM_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_num_admitted_running = parent_->metrics_group_->AddGauge(
       LOCAL_NUM_ADMITTED_RUNNING_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_num_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_num_queued = parent_->metrics_group_->AddGauge(
       LOCAL_NUM_QUEUED_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_backend_mem_usage = parent_->metrics_group_->AddGauge(
       LOCAL_BACKEND_MEM_USAGE_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.local_backend_mem_reserved = parent_->metrics_group_->AddGauge(
       LOCAL_BACKEND_MEM_RESERVED_METRIC_KEY_FORMAT, 0, name_);
 
-  metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_mem_resources = parent_->metrics_group_->AddGauge(
       POOL_MAX_MEM_RESOURCES_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.pool_max_requests = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_requests = parent_->metrics_group_->AddGauge(
       POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
-  metrics_.pool_max_queued = parent_->metrics_group_->AddGauge<int64_t>(
+  metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..e924f50 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -97,11 +97,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     // This is after registering with the statestored, so we already have to synchronize
     // access to the executors_config_ shared_ptr.
     int num_backends = GetExecutorsConfig()->NumBackends();
-    total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
-    total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
+    total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0);
+    total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
-    num_fragment_instances_metric_ =
-        metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends);
+    num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
   }
 
   if (statestore_subscriber_ != nullptr) {
@@ -197,7 +196,7 @@ void Scheduler::UpdateMembership(
 
   if (metrics_ != nullptr) {
     /// TODO-MT: fix this (do we even need to report it?)
-    num_fragment_instances_metric_->set_value(current_executors_.size());
+    num_fragment_instances_metric_->SetValue(current_executors_.size());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6358145..a62130c 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1059,8 +1059,8 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 Status ImpalaServer::UpdateCatalogMetrics() {
   TGetDbsResult dbs;
   RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
-  ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size());
-  ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
+  ImpaladMetrics::CATALOG_NUM_DBS->SetValue(dbs.dbs.size());
+  ImpaladMetrics::CATALOG_NUM_TABLES->SetValue(0L);
   for (const TDatabase& db: dbs.dbs) {
     TGetTablesResult table_names;
     RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
@@ -1433,7 +1433,7 @@ void ImpalaServer::CatalogUpdateCallback(
       TTopicDelta& update = subscriber_topic_updates->back();
       update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
       update.__set_from_version(0L);
-      ImpaladMetrics::CATALOG_READY->set_value(false);
+      ImpaladMetrics::CATALOG_READY->SetValue(false);
       // Dropped all cached lib files (this behaves as if all functions and data
       // sources are dropped).
       LibCache::instance()->DropCache();
@@ -1447,7 +1447,7 @@ void ImpalaServer::CatalogUpdateCallback(
         LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
             << " new min catalog object version: " << resp.min_catalog_object_version;
       }
-      ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
+      ImpaladMetrics::CATALOG_READY->SetValue(new_catalog_version > 0);
       // TODO: deal with an error status
       discard_result(UpdateCatalogMetrics());
       // Remove all dropped objects from the library cache.
@@ -2130,7 +2130,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
   }
   services_started_ = true;
-  ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
+  ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
   LOG(INFO) << "Impala has started.";
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/service/session-expiry-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index fa69476..a211701 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -58,8 +58,8 @@ TEST(SessionTest, TestExpiry) {
   IntGauge* hs2_session_metric =
       impala->metrics()->FindMetricForTesting<IntGauge>(
           ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS);
-  EXPECT_EQ(expired_metric->value(), 0L);
-  EXPECT_EQ(beeswax_session_metric->value(), 0L);
+  EXPECT_EQ(expired_metric->GetValue(), 0L);
+  EXPECT_EQ(beeswax_session_metric->GetValue(), 0L);
 
   {
     scoped_ptr<ThriftClient<ImpalaServiceClient>> beeswax_clients[NUM_SESSIONS];
@@ -80,16 +80,16 @@ TEST(SessionTest, TestExpiry) {
     }
 
     int64_t start = UnixMillis();
-    while (expired_metric->value() != NUM_SESSIONS * 2 &&
+    while (expired_metric->GetValue() != NUM_SESSIONS * 2 &&
       UnixMillis() - start < MAX_IDLE_TIMEOUT_MS) {
       SleepForMs(100);
     }
 
-    ASSERT_EQ(expired_metric->value(), NUM_SESSIONS * 2)
+    ASSERT_EQ(expired_metric->GetValue(), NUM_SESSIONS * 2)
         << "Sessions did not expire within "<< MAX_IDLE_TIMEOUT_MS / 1000 <<" secs";
-    ASSERT_EQ(beeswax_session_metric->value(), NUM_SESSIONS)
+    ASSERT_EQ(beeswax_session_metric->GetValue(), NUM_SESSIONS)
         << "Beeswax sessions unexpectedly closed after expiration";
-    ASSERT_EQ(hs2_session_metric->value(), NUM_SESSIONS)
+    ASSERT_EQ(hs2_session_metric->GetValue(), NUM_SESSIONS)
         << "HiveServer2 sessions unexpectedly closed after expiration";
 
     TPingImpalaServiceResp resp;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 678236e..99da183 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -113,7 +113,7 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
       metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
-  last_recovery_duration_metric_ = metrics_->AddGauge(
+  last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.last-recovery-time", "N/A");
@@ -164,12 +164,12 @@ Status StatestoreSubscriber::Register() {
   RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
       request, &response));
   Status status = Status(response.status);
-  if (status.ok()) connected_to_statestore_metric_->set_value(true);
+  if (status.ok()) connected_to_statestore_metric_->SetValue(true);
   if (response.__isset.registration_id) {
     lock_guard<mutex> l(registration_id_lock_);
     registration_id_ = response.registration_id;
     const string& registration_string = PrintId(registration_id_);
-    registration_id_metric_->set_value(registration_string);
+    registration_id_metric_->SetValue(registration_string);
     VLOG(1) << "Subscriber registration ID: " << registration_string;
   } else {
     VLOG(1) << "No subscriber registration ID received from statestore";
@@ -243,7 +243,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
       lock_guard<mutex> l(lock_);
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
-      connected_to_statestore_metric_->set_value(false);
+      connected_to_statestore_metric_->SetValue(false);
       LOG(INFO) << subscriber_id_
                 << ": Connection with statestore lost, entering recovery mode";
       uint32_t attempt_count = 1;
@@ -265,7 +265,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
                        << status.GetDetail();
           SleepForMs(SLEEP_INTERVAL_MS);
         }
-        last_recovery_duration_metric_->set_value(
+        last_recovery_duration_metric_->SetValue(
             recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
       }
       // When we're successful in re-registering, we don't do anything
@@ -273,9 +273,9 @@ void StatestoreSubscriber::RecoveryModeChecker() {
       // responsibility of individual clients to post missing updates
       // back to the statestore. This saves a lot of complexity where
       // we would otherwise have to cache updates here.
-      last_recovery_duration_metric_->set_value(
+      last_recovery_duration_metric_->SetValue(
           recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-      last_recovery_time_metric_->set_value(CurrentTimeString());
+      last_recovery_time_metric_->SetValue(CurrentTimeString());
     }
 
     SleepForMs(SLEEP_INTERVAL_MS);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d0a4851..b135e38 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -236,13 +236,12 @@ Statestore::Statestore(MetricGroup* metrics)
         FLAGS_statestore_max_missed_heartbeats / 2)) {
 
   DCHECK(metrics != NULL);
-  num_subscribers_metric_ =
-      metrics->AddGauge<int64_t>(STATESTORE_LIVE_SUBSCRIBERS, 0);
+  num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
   subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
       STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
-  key_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
-  value_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
-  topic_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
+  key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
+  value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
+  topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
 
   topic_update_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
@@ -398,7 +397,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     subscribers_.insert(make_pair(subscriber_id, current_registration));
     failure_detector_->UpdateHeartbeat(
         PrintId(current_registration->registration_id()), true);
-    num_subscribers_metric_->set_value(subscribers_.size());
+    num_subscribers_metric_->SetValue(subscribers_.size());
     subscriber_set_metric_->Add(subscriber_id);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/common-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/common-metrics.cc b/be/src/util/common-metrics.cc
index d147862..114e0e0 100644
--- a/be/src/util/common-metrics.cc
+++ b/be/src/util/common-metrics.cc
@@ -33,7 +33,7 @@ void CommonMetrics::InitCommonMetrics(MetricGroup* metric_group) {
   KUDU_CLIENT_VERSION = metric_group->AddProperty<string>(
       KUDU_CLIENT_VERSION_METRIC_NAME, kudu::client::GetShortVersionString());
 
-  PROCESS_START_TIME->set_value(CurrentTimeString());
+  PROCESS_START_TIME->SetValue(CurrentTimeString());
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 88d23f1..10966b4 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -211,7 +211,7 @@ void RootHandler(const Webserver::ArgumentMap& args, Document* document) {
     document->GetAllocator());
 
   if (CommonMetrics::PROCESS_START_TIME != nullptr) {
-    Value process_start_time(CommonMetrics::PROCESS_START_TIME->value().c_str(),
+    Value process_start_time(CommonMetrics::PROCESS_START_TIME->GetValue().c_str(),
       document->GetAllocator());
     document->AddMember("process_start_time", process_start_time,
       document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 1325f2e..18e96a8 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -162,70 +162,70 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   IMPALA_SERVER_READY = m->AddProperty<bool>(
       ImpaladMetricKeys::IMPALA_SERVER_READY, false);
 
-  IMPALA_SERVER_NUM_QUERIES = m->AddCounter<int64_t>(
+  IMPALA_SERVER_NUM_QUERIES = m->AddCounter(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_QUERIES, 0);
-  NUM_QUERIES_REGISTERED = m->AddGauge<int64_t>(ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
-  NUM_QUERIES_EXPIRED = m->AddCounter<int64_t>(
+  NUM_QUERIES_REGISTERED = m->AddGauge(
+      ImpaladMetricKeys::NUM_QUERIES_REGISTERED, 0);
+  NUM_QUERIES_EXPIRED = m->AddCounter(
       ImpaladMetricKeys::NUM_QUERIES_EXPIRED, 0);
-  NUM_QUERIES_SPILLED = m->AddCounter<int64_t>(
+  NUM_QUERIES_SPILLED = m->AddCounter(
       ImpaladMetricKeys::NUM_QUERIES_SPILLED, 0);
-  IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter<int64_t>(
+  IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS, 0);
   IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT, 0L);
-  IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge<int64_t>(
+  IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS, 0);
-  IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge<int64_t>(
+  IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS, 0);
-  NUM_SESSIONS_EXPIRED = m->AddCounter<int64_t>(
+  NUM_SESSIONS_EXPIRED = m->AddCounter(
       ImpaladMetricKeys::NUM_SESSIONS_EXPIRED, 0);
-  RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge<int64_t>(
+  RESULTSET_CACHE_TOTAL_NUM_ROWS = m->AddGauge(
       ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_NUM_ROWS, 0);
-  RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge<int64_t>(
+  RESULTSET_CACHE_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::RESULTSET_CACHE_TOTAL_BYTES, 0);
 
   // Initialize scan node metrics
-  NUM_RANGES_PROCESSED = m->AddCounter<int64_t>(
+  NUM_RANGES_PROCESSED = m->AddCounter(
       ImpaladMetricKeys::TOTAL_SCAN_RANGES_PROCESSED, 0);
-  NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter<int64_t>(
+  NUM_RANGES_MISSING_VOLUME_ID = m->AddCounter(
       ImpaladMetricKeys::NUM_SCAN_RANGES_MISSING_VOLUME_ID, 0);
 
   // Initialize memory usage metrics
-  MEM_POOL_TOTAL_BYTES = m->AddGauge<int64_t>(
+  MEM_POOL_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::MEM_POOL_TOTAL_BYTES, 0);
-  HASH_TABLE_TOTAL_BYTES = m->AddGauge<int64_t>(
+  HASH_TABLE_TOTAL_BYTES = m->AddGauge(
       ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES, 0);
 
   // Initialize insert metrics
-  NUM_FILES_OPEN_FOR_INSERT = m->AddGauge<int64_t>(
+  NUM_FILES_OPEN_FOR_INSERT = m->AddGauge(
       ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
 
   // Initialize IO mgr metrics
-  IO_MGR_NUM_OPEN_FILES = m->AddGauge<int64_t>(
-      ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
-  IO_MGR_NUM_BUFFERS = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
-  IO_MGR_TOTAL_BYTES = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
-  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge<int64_t>(
+  IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
+  IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
+  IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
+  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
-  IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge<int64_t>(
+  IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
-  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge<int64_t>(
+  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge<int64_t>(
+  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge<int64_t>(
+  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT, 0);
 
-  IO_MGR_BYTES_READ = m->AddCounter<int64_t>(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
-  IO_MGR_LOCAL_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_BYTES_READ = m->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
+  IO_MGR_LOCAL_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
-  IO_MGR_CACHED_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_CACHED_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
-  IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter<int64_t>(
+  IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
-  IO_MGR_BYTES_WRITTEN = m->AddCounter<int64_t>(
+  IO_MGR_BYTES_WRITTEN = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0);
 
   IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
@@ -233,8 +233,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO);
 
   // Initialize catalog metrics
-  CATALOG_NUM_DBS = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
-  CATALOG_NUM_TABLES = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
+  CATALOG_NUM_DBS = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_DBS, 0);
+  CATALOG_NUM_TABLES = m->AddGauge(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0);
   CATALOG_READY = m->AddProperty<bool>(ImpaladMetricKeys::CATALOG_READY, false);
 
   // Maximum duration to be tracked by the query durations metric. No particular reasoning
@@ -248,8 +248,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3));
 
   // Initialize Hedged read metrics
-  HEDGED_READ_OPS = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
-  HEDGED_READ_OPS_WIN = m->AddCounter<int64_t>(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
+  HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
+  HEDGED_READ_OPS_WIN = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 3308bf4..fd78343 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -32,7 +32,7 @@ using namespace strings;
 
 DECLARE_bool(mmap_buffers);
 
-SumGauge<int64_t>* AggregateMemoryMetrics::TOTAL_USED = nullptr;
+SumGauge* AggregateMemoryMetrics::TOTAL_USED = nullptr;
 IntGauge* AggregateMemoryMetrics::NUM_MAPS = nullptr;
 IntGauge* AggregateMemoryMetrics::MAPPED_BYTES = nullptr;
 IntGauge* AggregateMemoryMetrics::RSS = nullptr;
@@ -110,19 +110,19 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
 #endif
   MetricGroup* aggregate_metrics = metrics->GetOrCreateChildGroup("memory");
   AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
-      new SumGauge<int64_t>(MetricDefs::Get("memory.total-used"), used_metrics));
+      new SumGauge(MetricDefs::Get("memory.total-used"), used_metrics));
   if (register_jvm_metrics) {
     RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
   }
 
   if (MemInfo::HaveSmaps()) {
     AggregateMemoryMetrics::NUM_MAPS =
-        aggregate_metrics->AddGauge<int64_t>("memory.num-maps", 0U);
+        aggregate_metrics->AddGauge("memory.num-maps", 0U);
     AggregateMemoryMetrics::MAPPED_BYTES =
-        aggregate_metrics->AddGauge<int64_t>("memory.mapped-bytes", 0U);
-    AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge<int64_t>("memory.rss", 0U);
+        aggregate_metrics->AddGauge("memory.mapped-bytes", 0U);
+    AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge("memory.rss", 0U);
     AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES =
-        aggregate_metrics->AddGauge<int64_t>("memory.anon-huge-page-bytes", 0U);
+        aggregate_metrics->AddGauge("memory.anon-huge-page-bytes", 0U);
   }
   ThpConfig thp_config = MemInfo::ParseThpConfig();
   AggregateMemoryMetrics::THP_ENABLED =
@@ -139,16 +139,16 @@ void AggregateMemoryMetrics::Refresh() {
   if (NUM_MAPS != nullptr) {
     // Only call ParseSmaps() if the metrics were created.
     MappedMemInfo map_info = MemInfo::ParseSmaps();
-    NUM_MAPS->set_value(map_info.num_maps);
-    MAPPED_BYTES->set_value(map_info.size_kb * 1024);
-    RSS->set_value(map_info.rss_kb * 1024);
-    ANON_HUGE_PAGE_BYTES->set_value(map_info.anon_huge_pages_kb * 1024);
+    NUM_MAPS->SetValue(map_info.num_maps);
+    MAPPED_BYTES->SetValue(map_info.size_kb * 1024);
+    RSS->SetValue(map_info.rss_kb * 1024);
+    ANON_HUGE_PAGE_BYTES->SetValue(map_info.anon_huge_pages_kb * 1024);
   }
 
   ThpConfig thp_config = MemInfo::ParseThpConfig();
-  THP_ENABLED->set_value(thp_config.enabled);
-  THP_DEFRAG->set_value(thp_config.defrag);
-  THP_KHUGEPAGED_DEFRAG->set_value(thp_config.khugepaged_defrag);
+  THP_ENABLED->SetValue(thp_config.enabled);
+  THP_DEFRAG->SetValue(thp_config.defrag);
+  THP_KHUGEPAGED_DEFRAG->SetValue(thp_config.khugepaged_defrag);
 }
 
 JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
@@ -192,35 +192,36 @@ Status JvmMetric::InitMetrics(MetricGroup* metrics) {
   return Status::OK();
 }
 
-void JvmMetric::CalculateValue() {
+int64_t JvmMetric::GetValue() {
   TGetJvmMetricsRequest request;
   request.get_all = false;
   request.__set_memory_pool(mempool_name_);
   TGetJvmMetricsResponse response;
-  if (!JniUtil::GetJvmMetrics(request, &response).ok()) return;
-  if (response.memory_pools.size() != 1) return;
+  if (!JniUtil::GetJvmMetrics(request, &response).ok()) return 0;
+  if (response.memory_pools.size() != 1) return 0;
   TJvmMemoryPool& pool = response.memory_pools[0];
   DCHECK(pool.name == mempool_name_);
   switch (metric_type_) {
-    case MAX: value_ = pool.max;
-      return;
-    case INIT: value_ = pool.init;
-      return;
-    case CURRENT: value_ = pool.used;
-      return;
-    case COMMITTED: value_ = pool.committed;
-      return;
-    case PEAK_MAX: value_ = pool.peak_max;
-      return;
-    case PEAK_INIT: value_ = pool.peak_init;
-      return;
-    case PEAK_CURRENT: value_ = pool.peak_used;
-      return;
-    case PEAK_COMMITTED: value_ = pool.peak_committed;
-      return;
+    case MAX:
+      return pool.max;
+    case INIT:
+      return pool.init;
+    case CURRENT:
+      return pool.used;
+    case COMMITTED:
+      return pool.committed;
+    case PEAK_MAX:
+      return pool.peak_max;
+    case PEAK_INIT:
+      return pool.peak_init;
+    case PEAK_CURRENT:
+      return pool.peak_used;
+    case PEAK_COMMITTED:
+      return pool.peak_committed;
     default:
       DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
   }
+  return 0;
 }
 
 Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
@@ -263,47 +264,39 @@ BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType t
     global_reservations_(global_reservations),
     buffer_pool_(buffer_pool) {}
 
-void BufferPoolMetric::CalculateValue() {
+int64_t BufferPoolMetric::GetValue() {
   // IMPALA-6362: we have to be careful that none of the below calls to ReservationTracker
   // methods acquire ReservationTracker::lock_ to avoid a potential circular dependency
   // with MemTracker::child_trackers_lock_, which may be held when refreshing MemTracker
   // consumption.
   switch (type_) {
     case BufferPoolMetricType::LIMIT:
-      value_ = buffer_pool_->GetSystemBytesLimit();
-      break;
+      return buffer_pool_->GetSystemBytesLimit();
     case BufferPoolMetricType::SYSTEM_ALLOCATED:
-      value_ = buffer_pool_->GetSystemBytesAllocated();
-      break;
+      return buffer_pool_->GetSystemBytesAllocated();
     case BufferPoolMetricType::RESERVED:
-      value_ = global_reservations_->GetReservation();
-      break;
+      return global_reservations_->GetReservation();
     case BufferPoolMetricType::UNUSED_RESERVATION_BYTES: {
       // Estimate the unused reservation based on other aggregate values, defined as
       // the total bytes of reservation where there is no corresponding buffer in use
       // by a client. Buffers are either in-use, free buffers, or attached to clean pages.
       int64_t total_used_reservation = buffer_pool_->GetSystemBytesAllocated()
-        - buffer_pool_->GetFreeBufferBytes()
-        - buffer_pool_->GetCleanPageBytes();
-      value_ = global_reservations_->GetReservation() - total_used_reservation;
-      break;
+          - buffer_pool_->GetFreeBufferBytes()
+          - buffer_pool_->GetCleanPageBytes();
+      return global_reservations_->GetReservation() - total_used_reservation;
     }
     case BufferPoolMetricType::NUM_FREE_BUFFERS:
-      value_ = buffer_pool_->GetNumFreeBuffers();
-      break;
+      return buffer_pool_->GetNumFreeBuffers();
     case BufferPoolMetricType::FREE_BUFFER_BYTES:
-      value_ = buffer_pool_->GetFreeBufferBytes();
-      break;
+      return buffer_pool_->GetFreeBufferBytes();
     case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
-      value_ = buffer_pool_->GetCleanPageBytesLimit();
-      break;
+      return buffer_pool_->GetCleanPageBytesLimit();
     case BufferPoolMetricType::NUM_CLEAN_PAGES:
-      value_ = buffer_pool_->GetNumCleanPages();
-      break;
+      return buffer_pool_->GetNumCleanPages();
     case BufferPoolMetricType::CLEAN_PAGE_BYTES:
-      value_ = buffer_pool_->GetCleanPageBytes();
-      break;
+      return buffer_pool_->GetCleanPageBytes();
     default:
       DCHECK(false) << "Unknown BufferPoolMetricType: " << static_cast<int>(type_);
   }
+  return 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 3294c30..6c10e09 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -44,7 +44,7 @@ class AggregateMemoryMetrics {
   /// including JVM memory), which is either in use by queries or cached by the BufferPool
   /// or the malloc implementation.
   /// TODO: IMPALA-691 - consider changing this to include JVM memory.
-  static SumGauge<int64_t>* TOTAL_USED;
+  static SumGauge* TOTAL_USED;
 
   /// The total number of virtual memory regions for the process.
   /// The value must be refreshed by calling Refresh().
@@ -106,9 +106,8 @@ class TcmallocMetric : public IntGauge {
    public:
     PhysicalBytesMetric(const TMetricDef& def) : IntGauge(def, 0) { }
 
-   private:
-    virtual void CalculateValue() {
-      value_ = TOTAL_BYTES_RESERVED->value() - PAGEHEAP_UNMAPPED_BYTES->value();
+    virtual int64_t GetValue() override {
+      return TOTAL_BYTES_RESERVED->GetValue() - PAGEHEAP_UNMAPPED_BYTES->GetValue();
     }
   };
 
@@ -117,20 +116,21 @@ class TcmallocMetric : public IntGauge {
   static TcmallocMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
       const std::string& tcmalloc_var);
 
+  virtual int64_t GetValue() override {
+    int64_t retval = 0;
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+    MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
+        reinterpret_cast<size_t*>(&retval));
+#endif
+    return retval;
+  }
+
  private:
   /// Name of the tcmalloc property this metric should fetch.
   const std::string tcmalloc_var_;
 
   TcmallocMetric(const TMetricDef& def, const std::string& tcmalloc_var)
-      : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
-
-  virtual void CalculateValue() {
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
-    DCHECK_EQ(sizeof(size_t), sizeof(value_));
-    MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
-        reinterpret_cast<size_t*>(&value_));
-#endif
-  }
+    : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
 };
 
 /// Alternative to TCMallocMetric if we're running under a sanitizer that replaces
@@ -138,12 +138,16 @@ class TcmallocMetric : public IntGauge {
 class SanitizerMallocMetric : public IntGauge {
  public:
   SanitizerMallocMetric(const TMetricDef& def) : IntGauge(def, 0) {}
+
   static SanitizerMallocMetric* BYTES_ALLOCATED;
- private:
-  virtual void CalculateValue() override {
+
+  virtual int64_t GetValue() override {
 #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
-    value_ = __sanitizer_get_current_allocated_bytes();
+    return __sanitizer_get_current_allocated_bytes();
+#else
+    return 0;
 #endif
+
   }
 };
 
@@ -157,10 +161,9 @@ class JvmMetric : public IntGauge {
   /// pool (usually ~5 pools plus a synthetic 'total' pool).
   static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
 
- protected:
   /// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
   /// right value from that structure according to metric_type_.
-  virtual void CalculateValue();
+  virtual int64_t GetValue() override;
 
  private:
   /// Each names one of the fields in TJvmMemoryPool.
@@ -206,8 +209,7 @@ class BufferPoolMetric : public IntGauge {
   static BufferPoolMetric* NUM_CLEAN_PAGES;
   static BufferPoolMetric* CLEAN_PAGE_BYTES;
 
- protected:
-  virtual void CalculateValue();
+  virtual int64_t GetValue() override;
 
  private:
   friend class ReservationTrackerTest;

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 0126281..bfbfdfe 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -36,7 +36,7 @@ namespace impala {
 template <typename M, typename T>
 void AssertValue(M* metric, const T& value,
     const string& human_readable) {
-  EXPECT_EQ(metric->value(), value);
+  EXPECT_EQ(metric->GetValue(), value);
   if (!human_readable.empty()) {
     EXPECT_EQ(metric->ToHumanReadable(), human_readable);
   }
@@ -73,36 +73,36 @@ class MetricsTest : public testing::Test {
 TEST_F(MetricsTest, CounterMetrics) {
   MetricGroup metrics("CounterMetrics");
   AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT);
-  IntCounter* int_counter = metrics.AddCounter<int64_t>("counter", 0);
+  IntCounter* int_counter = metrics.AddCounter("counter", 0);
   AssertValue(int_counter, 0, "0");
   int_counter->Increment(1);
   AssertValue(int_counter, 1, "1");
   int_counter->Increment(10);
   AssertValue(int_counter, 11, "11");
-  int_counter->set_value(3456);
+  int_counter->SetValue(3456);
   AssertValue(int_counter, 3456, "3.46K");
 
   AddMetricDef("counter_with_units", TMetricKind::COUNTER, TUnit::BYTES);
   IntCounter* int_counter_with_units =
-      metrics.AddCounter<int64_t>("counter_with_units", 10);
+      metrics.AddCounter("counter_with_units", 10);
   AssertValue(int_counter_with_units, 10, "10.00 B");
 }
 
 TEST_F(MetricsTest, GaugeMetrics) {
   MetricGroup metrics("GaugeMetrics");
   AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
-  IntGauge* int_gauge = metrics.AddGauge<int64_t>("gauge", 0);
+  IntGauge* int_gauge = metrics.AddGauge("gauge", 0);
   AssertValue(int_gauge, 0, "0");
   int_gauge->Increment(-1);
   AssertValue(int_gauge, -1, "-1");
   int_gauge->Increment(10);
   AssertValue(int_gauge, 9, "9");
-  int_gauge->set_value(3456);
+  int_gauge->SetValue(3456);
   AssertValue(int_gauge, 3456, "3456");
 
   AddMetricDef("gauge_with_units", TMetricKind::GAUGE, TUnit::TIME_S);
   IntGauge* int_gauge_with_units =
-      metrics.AddGauge<int64_t>("gauge_with_units", 10);
+      metrics.AddGauge("gauge_with_units", 10);
   AssertValue(int_gauge_with_units, 10, "10s000ms");
 }
 
@@ -111,12 +111,12 @@ TEST_F(MetricsTest, SumGauge) {
   AddMetricDef("gauge1", TMetricKind::GAUGE, TUnit::NONE);
   AddMetricDef("gauge2", TMetricKind::GAUGE, TUnit::NONE);
   AddMetricDef("sum", TMetricKind::GAUGE, TUnit::NONE);
-  IntGauge* gauge1 = metrics.AddGauge<int64_t>("gauge1", 0);
-  IntGauge* gauge2 = metrics.AddGauge<int64_t>("gauge2", 0);
+  IntGauge* gauge1 = metrics.AddGauge("gauge1", 0);
+  IntGauge* gauge2 = metrics.AddGauge("gauge2", 0);
 
   vector<IntGauge*> gauges({gauge1, gauge2});
   IntGauge* sum_gauge =
-      metrics.RegisterMetric(new SumGauge<int64_t>(MetricDefs::Get("sum"), gauges));
+      metrics.RegisterMetric(new SumGauge(MetricDefs::Get("sum"), gauges));
 
   AssertValue(sum_gauge, 0, "0");
   gauge1->Increment(1);
@@ -132,14 +132,14 @@ TEST_F(MetricsTest, PropertyMetrics) {
   AddMetricDef("bool_property", TMetricKind::PROPERTY, TUnit::NONE);
   BooleanProperty* bool_property = metrics.AddProperty("bool_property", false);
   AssertValue(bool_property, false, "false");
-  bool_property->set_value(true);
+  bool_property->SetValue(true);
   AssertValue(bool_property, true, "true");
 
   AddMetricDef("string_property", TMetricKind::PROPERTY, TUnit::NONE);
   StringProperty* string_property = metrics.AddProperty("string_property",
       string("string1"));
   AssertValue(string_property, "string1", "string1");
-  string_property->set_value("string2");
+  string_property->SetValue("string2");
   AssertValue(string_property, "string2", "string2");
 }
 
@@ -147,11 +147,11 @@ TEST_F(MetricsTest, NonFiniteValues) {
   MetricGroup metrics("NanValues");
   AddMetricDef("inf_value", TMetricKind::GAUGE, TUnit::NONE);
   double inf = numeric_limits<double>::infinity();
-  DoubleGauge* gauge = metrics.AddGauge("inf_value", inf);
+  DoubleGauge* gauge = metrics.AddDoubleGauge("inf_value", inf);
   AssertValue(gauge, inf, "inf");
   double nan = numeric_limits<double>::quiet_NaN();
-  gauge->set_value(nan);
-  EXPECT_TRUE(std::isnan(gauge->value()));
+  gauge->SetValue(nan);
+  EXPECT_TRUE(std::isnan(gauge->GetValue()));
   EXPECT_TRUE(gauge->ToHumanReadable() == "nan");
 }
 
@@ -223,19 +223,19 @@ TEST_F(MetricsTest, MemMetric) {
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.bytes-in-use");
   ASSERT_TRUE(bytes_in_use != NULL);
 
-  uint64_t cur_in_use = bytes_in_use->value();
+  uint64_t cur_in_use = bytes_in_use->GetValue();
   EXPECT_GT(cur_in_use, 0);
 
   // Allocate 100MB to increase the number of bytes used. TCMalloc may also give up some
   // bytes during this allocation, so this allocation is deliberately large to ensure that
   // the bytes used metric goes up net.
   scoped_ptr<vector<uint64_t>> chunk(new vector<uint64_t>(100 * 1024 * 1024));
-  EXPECT_GT(bytes_in_use->value(), cur_in_use);
+  EXPECT_GT(bytes_in_use->GetValue(), cur_in_use);
 
   IntGauge* total_bytes_reserved =
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.total-bytes-reserved");
   ASSERT_TRUE(total_bytes_reserved != NULL);
-  ASSERT_GT(total_bytes_reserved->value(), 0);
+  ASSERT_GT(total_bytes_reserved->GetValue(), 0);
 
   IntGauge* pageheap_free_bytes =
       metrics.FindMetricForTesting<IntGauge>("tcmalloc.pageheap-free-bytes");
@@ -254,12 +254,12 @@ TEST_F(MetricsTest, JvmMetrics) {
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
           "jvm.total.current-usage-bytes");
   ASSERT_TRUE(jvm_total_used != NULL);
-  EXPECT_GT(jvm_total_used->value(), 0);
+  EXPECT_GT(jvm_total_used->GetValue(), 0);
   IntGauge* jvm_peak_total_used =
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
           "jvm.total.peak-current-usage-bytes");
   ASSERT_TRUE(jvm_peak_total_used != NULL);
-  EXPECT_GT(jvm_peak_total_used->value(), 0);
+  EXPECT_GT(jvm_peak_total_used->GetValue(), 0);
 }
 
 void AssertJson(const Value& val, const string& name, const string& value,
@@ -274,7 +274,7 @@ void AssertJson(const Value& val, const string& name, const string& value,
 TEST_F(MetricsTest, CountersJson) {
   MetricGroup metrics("CounterMetrics");
   AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT, "description");
-  metrics.AddCounter<int64_t>("counter", 0);
+  metrics.AddCounter("counter", 0);
   Document document;
   Value val;
   metrics.ToJson(true, &document, &val);
@@ -286,7 +286,7 @@ TEST_F(MetricsTest, CountersJson) {
 TEST_F(MetricsTest, GaugesJson) {
   MetricGroup metrics("GaugeMetrics");
   AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
-  metrics.AddGauge<int64_t>("gauge", 10);
+  metrics.AddGauge("gauge", 10);
   Document document;
   Value val;
   metrics.ToJson(true, &document, &val);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 12d6df3..b513c1e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -27,6 +27,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 
+#include "common/atomic.h"
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "common/status.h"
@@ -118,59 +119,37 @@ class Metric {
   void AddStandardFields(rapidjson::Document* document, rapidjson::Value* val);
 };
 
-/// A SimpleMetric has a value which is a simple primitive type: e.g. integers, strings and
-/// floats. It is parameterised not only by the type of its value, but by both the unit
-/// (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself. The kind
-/// can be one of: 'gauge', which may increase or decrease over time, a 'counter' which is
-/// increasing only over time, or a 'property' which is not numeric.
-//
-/// SimpleMetrics return their current value through the value() method. Access to value()
-/// is thread-safe.
-//
-/// TODO: We can use type traits to select a more efficient lock-free implementation of
-/// value() etc. where it is safe to do so.
-/// TODO: CalculateValue() can be returning a value, its current interface is not clean.
-template<typename T, TMetricKind::type metric_kind=TMetricKind::GAUGE>
-class SimpleMetric : public Metric {
+/// A ScalarMetric has a value which is a simple primitive type: e.g. integers, strings
+/// and floats. It is parameterised not only by the type of its value, but by both the
+/// unit (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself.
+/// The kind can be one of:
+/// - 'gauge', which may increase or decrease over time,
+/// - 'counter' which can only increase over time
+/// - 'property' which is a value store which can be read and written only
+///
+/// Note that management software may use the metric kind as hint on how to display
+/// the value. ScalarMetrics return their current value through the GetValue() method
+/// and set/initialize the value with SetValue(). Both methods are thread safe.
+template<typename T, TMetricKind::type metric_kind_t>
+class ScalarMetric: public Metric {
  public:
-  SimpleMetric(const TMetricDef& metric_def, const T& initial_value)
-      : Metric(metric_def), unit_(metric_def.units), value_(initial_value) {
-    DCHECK_EQ(metric_kind, metric_def.kind) << "Metric kind does not match definition: "
+  ScalarMetric(const TMetricDef& metric_def)
+    : Metric(metric_def), unit_(metric_def.units) {
+    DCHECK_EQ(metric_kind_t, metric_def.kind) << "Metric kind does not match definition: "
         << metric_def.key;
   }
 
-  virtual ~SimpleMetric() { }
-
-  /// Returns the current value, updating it if necessary. Thread-safe.
-  T value() {
-    boost::lock_guard<SpinLock> l(lock_);
-    CalculateValue();
-    return value_;
-  }
-
-  /// Sets the current value. Thread-safe.
-  void set_value(const T& value) {
-    boost::lock_guard<SpinLock> l(lock_);
-    value_ = value;
-  }
+  virtual ~ScalarMetric() { }
 
-  /// Adds 'delta' to the current value atomically.
-  void Increment(const T& delta) {
-    DCHECK(kind() != TMetricKind::PROPERTY)
-        << "Can't change value of PROPERTY metric: " << key();
-    DCHECK(kind() != TMetricKind::COUNTER || delta >= 0)
-        << "Can't decrement value of COUNTER metric: " << key();
-    if (delta == 0) return;
-    boost::lock_guard<SpinLock> l(lock_);
-    value_ += delta;
-  }
+  /// Returns the current value. Thread-safe.
+  virtual T GetValue() = 0;
 
-  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) {
+  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) override {
     rapidjson::Value container(rapidjson::kObjectType);
     AddStandardFields(document, &container);
 
     rapidjson::Value metric_value;
-    ToJsonValue(value(), TUnit::NONE, document, &metric_value);
+    ToJsonValue(GetValue(), TUnit::NONE, document, &metric_value);
     container.AddMember("value", metric_value, document->GetAllocator());
 
     rapidjson::Value type_value(PrintTMetricKind(kind()).c_str(),
@@ -181,30 +160,46 @@ class SimpleMetric : public Metric {
     *val = container;
   }
 
-  virtual std::string ToHumanReadable() {
-    return PrettyPrinter::Print(value(), unit());
+  virtual std::string ToHumanReadable() override {
+    return PrettyPrinter::Print(GetValue(), unit());
   }
 
-  virtual void ToLegacyJson(rapidjson::Document* document) {
+  virtual void ToLegacyJson(rapidjson::Document* document) override {
     rapidjson::Value val;
-    ToJsonValue(value(), TUnit::NONE, document, &val);
+    ToJsonValue(GetValue(), TUnit::NONE, document, &val);
     document->AddMember(key_.c_str(), val, document->GetAllocator());
   }
 
   TUnit::type unit() const { return unit_; }
-  TMetricKind::type kind() const { return metric_kind; }
+  TMetricKind::type kind() const { return metric_kind_t; }
 
  protected:
-  /// Called to compute value_ if necessary during calls to value(). The more natural
-  /// approach would be to have virtual T value(), but that's not possible in C++.
-  //
-  /// TODO: Should be cheap to have a blank implementation, but if required we can cause
-  /// the compiler to avoid calling this entirely through a compile-time constant.
-  virtual void CalculateValue() { }
-
   /// Units of this metric.
   const TUnit::type unit_;
+};
 
+/// An implementation of scalar metric with spinlock.
+template<typename T, TMetricKind::type metric_kind_t>
+class LockedMetric : public ScalarMetric<T, metric_kind_t> {
+ public:
+  LockedMetric(const TMetricDef& metric_def, const T& initial_value)
+    : ScalarMetric<T, metric_kind_t>(metric_def), value_(initial_value) {}
+
+  virtual ~LockedMetric() {}
+
+  /// Atomically reads the current value.
+  virtual T GetValue() override {
+    boost::lock_guard<SpinLock> l(lock_);
+    return value_;
+  }
+
+  /// Atomically sets the value.
+  void SetValue(const T& value) {
+    boost::lock_guard<SpinLock> l(lock_);
+    value_ = value;
+  }
+
+ protected:
   /// Guards access to value_.
   SpinLock lock_;
 
@@ -212,42 +207,81 @@ class SimpleMetric : public Metric {
   T value_;
 };
 
-// Gauge metric that computes the sum of several gauges.
-template <typename T>
-class SumGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+typedef class LockedMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
+typedef class LockedMetric<std::string,TMetricKind::PROPERTY> StringProperty;
+typedef class LockedMetric<double, TMetricKind::GAUGE> DoubleGauge;
+
+/// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented
+/// atomically via the Increment() interface.
+template<TMetricKind::type metric_kind_t>
+class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
  public:
-  SumGauge(const TMetricDef& metric_def,
-      const std::vector<SimpleMetric<T, TMetricKind::GAUGE>*>& metrics)
-    : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metrics_(metrics) {}
+  AtomicMetric(const TMetricDef& metric_def, const int64_t initial_value)
+    : ScalarMetric<int64_t, metric_kind_t>(metric_def), value_(initial_value) {
+    DCHECK(metric_kind_t == TMetricKind::GAUGE || metric_kind_t == TMetricKind::COUNTER);
+  }
+
+  virtual ~AtomicMetric() {}
+
+  /// Atomically reads the current value. May be overridden by derived classes.
+  /// The default implementation just atomically loads 'value_'. Derived classes
+  /// which derive the return value from mutliple sources other than 'value_'
+  /// need to take care of synchronization among sources.
+  virtual int64_t GetValue() override { return value_.Load(); }
+
+  /// Atomically sets the value.
+  void SetValue(const int64_t& value) { value_.Store(value); }
+
+  /// Adds 'delta' to the current value atomically.
+  void Increment(int64_t delta) {
+    DCHECK(metric_kind_t != TMetricKind::COUNTER || delta >= 0)
+        << "Can't decrement value of COUNTER metric: " << this->key();
+    value_.Add(delta);
+  }
+
+ protected:
+  /// The current value of the metric.
+  AtomicInt64 value_;
+};
+
+/// We write 'Int' as a placeholder for all integer types.
+typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
+typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
+
+/// Gauge metric that computes the sum of several gauges.
+class SumGauge : public IntGauge {
+ public:
+  SumGauge(const TMetricDef& metric_def, const std::vector<IntGauge*>& gauges)
+    : IntGauge(metric_def, 0), gauges_(gauges) {}
+
   virtual ~SumGauge() {}
 
- private:
-  virtual void CalculateValue() override {
-    T sum = 0;
-    for (SimpleMetric<T, TMetricKind::GAUGE>* metric : metrics_) sum += metric->value();
-    this->value_ = sum;
+  virtual int64_t GetValue() override {
+    // Note that this doesn't hold the locks of all gauages before computing the sum so
+    // it's possible for one of the gauages to change after being read and added to sum.
+    int64_t sum = 0;
+    for (auto gauge : gauges_) sum += gauge->GetValue();
+    return sum;
   }
 
-  /// The metrics to be summed.
-  std::vector<SimpleMetric<T, TMetricKind::GAUGE>*> metrics_;
+ private:
+  /// The gauges to be summed.
+  std::vector<IntGauge*> gauges_;
 };
 
-// Gauge metric that negates another gauge.
-template <typename T>
-class NegatedGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+/// Gauge metric that negates another gauge.
+class NegatedGauge : public IntGauge {
  public:
-  NegatedGauge(const TMetricDef& metric_def,
-      SimpleMetric<T, TMetricKind::GAUGE>* metric)
-    : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metric_(metric) {}
+  NegatedGauge(const TMetricDef& metric_def, IntGauge* gauge)
+    : IntGauge(metric_def, 0), gauge_(gauge) {}
+
   virtual ~NegatedGauge() {}
 
- private:
-  virtual void CalculateValue() override {
-    this->value_ = -metric_->value();
-  }
+  virtual int64_t GetValue() override { return -gauge_->GetValue(); }
 
+ private:
   /// The metric to be negated.
-  SimpleMetric<T, TMetricKind::GAUGE>* metric_;
+  IntGauge* gauge_;
 };
 
 /// Container for a set of metrics. A MetricGroup owns the memory for every metric
@@ -285,27 +319,28 @@ class MetricGroup {
   }
 
   /// Create a gauge metric object with given key and initial value (owned by this object)
-  template<typename T>
-  SimpleMetric<T>* AddGauge(const std::string& key, const T& value,
+  IntGauge* AddGauge(const std::string& key, const int64_t value,
       const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::GAUGE>(
-        MetricDefs::Get(key, metric_def_arg), value));
+    return RegisterMetric(new IntGauge(MetricDefs::Get(key, metric_def_arg), value));
   }
 
-  template<typename T>
-  SimpleMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
-      const T& value, const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::PROPERTY>(
-        MetricDefs::Get(key, metric_def_arg), value));
+  DoubleGauge* AddDoubleGauge(const std::string& key, const double value,
+      const std::string& metric_def_arg = "") {
+    return RegisterMetric(new DoubleGauge(MetricDefs::Get(key, metric_def_arg), value));
   }
 
   template<typename T>
-  SimpleMetric<T, TMetricKind::COUNTER>* AddCounter(const std::string& key,
+  LockedMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
       const T& value, const std::string& metric_def_arg = "") {
-    return RegisterMetric(new SimpleMetric<T, TMetricKind::COUNTER>(
+    return RegisterMetric(new LockedMetric<T, TMetricKind::PROPERTY>(
         MetricDefs::Get(key, metric_def_arg), value));
   }
 
+  IntCounter* AddCounter(const std::string& key, const int64_t value,
+      const std::string& metric_def_arg = "") {
+    return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value));
+  }
+
   /// Returns a metric by key. All MetricGroups reachable from this group are searched in
   /// depth-first order, starting with the root group.  Returns NULL if there is no metric
   /// with that key. This is not a very cheap operation; the result should be cached where
@@ -380,13 +415,6 @@ class MetricGroup {
       rapidjson::Document* document);
 };
 
-/// We write 'Int' as a placeholder for all integer types.
-typedef class SimpleMetric<int64_t, TMetricKind::GAUGE> IntGauge;
-typedef class SimpleMetric<double, TMetricKind::GAUGE> DoubleGauge;
-typedef class SimpleMetric<int64_t, TMetricKind::COUNTER> IntCounter;
-
-typedef class SimpleMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
-typedef class SimpleMetric<std::string, TMetricKind::PROPERTY> StringProperty;
 
 /// Convenience method to instantiate a TMetricDef with a subset of its fields defined.
 /// Most externally-visible metrics should be defined in metrics.json and retrieved via

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 536119b..8397f35 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -194,10 +194,8 @@ Status ThreadMgr::StartInstrumentation(MetricGroup* metrics) {
   DCHECK(metrics != NULL);
   lock_guard<mutex> l(lock_);
   metrics_enabled_ = true;
-  total_threads_metric_ = metrics->AddGauge<int64_t>(
-      "thread-manager.total-threads-created", 0L);
-  current_num_threads_metric_ = metrics->AddGauge<int64_t>(
-      "thread-manager.running-threads", 0L);
+  total_threads_metric_ = metrics->AddGauge("thread-manager.total-threads-created", 0L);
+  current_num_threads_metric_ = metrics->AddGauge("thread-manager.running-threads", 0L);
   return Status::OK();
 }
 
@@ -224,7 +222,7 @@ void ThreadMgr::RemoveThread(const thread::id& boost_id, const string& category)
 void ThreadMgr::GetThreadOverview(Document* document) {
   lock_guard<mutex> l(lock_);
   if (metrics_enabled_) {
-    document->AddMember("total_threads", current_num_threads_metric_->value(),
+    document->AddMember("total_threads", current_num_threads_metric_->GetValue(),
         document->GetAllocator());
   }
   Value lst(kArrayType);

http://git-wip-us.apache.org/repos/asf/impala/blob/e714f2b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index dafe986..f493d33 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -882,7 +882,7 @@
       "IMPALAD"
     ],
     "label": "StateStore Subscriber Last Recovery Duration",
-    "units": "NONE",
+    "units": "TIME_S",
     "kind": "GAUGE",
     "key": "statestore-subscriber.last-recovery-duration"
   },


[2/3] impala git commit: Revert "IMPALA-5528: Upgrade GPerfTools to 2.6.3 and tune TCMalloc for KRPC"

Posted by kw...@apache.org.
Revert "IMPALA-5528: Upgrade GPerfTools to 2.6.3 and tune TCMalloc for KRPC"

This reverts commit df3a440fff38225a03879955c99a87d8ced3b13a.

Apparently, linking Impalad against GPerfTools 2.6.3 caused Impalad to fail
on certain platforms (OLE6). The failure's symptom is SIGSEGV when trying to
exec Impalad binary. It's unclear which commit in GPerfTools could have caused
it so backing up this change to allow Impala to unbreak some platforms for now.

Change-Id: I97cccca74fb199d6ff0a42fe818f8789a0d66e83
Reviewed-on: http://gerrit.cloudera.org:8080/9057
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b3d38b5c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b3d38b5c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b3d38b5c

Branch: refs/heads/master
Commit: b3d38b5c8650fb455ca556f321fb7aea35dbd5ee
Parents: 028a83e
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jan 17 19:27:17 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 18 23:25:09 2018 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-allocator-test.cc |  7 -------
 be/src/runtime/bufferpool/free-list-test.cc     | 13 +-----------
 be/src/runtime/bufferpool/suballocator-test.cc  | 13 +-----------
 be/src/runtime/exec-env.cc                      | 21 ++++++++++----------
 bin/impala-config.sh                            |  4 ++--
 5 files changed, 14 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b3d38b5c/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 2b87278..21a9c08 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -22,8 +22,6 @@
 #include "runtime/bufferpool/buffer-pool-internal.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/system-allocator.h"
-#include "runtime/test-env.h"
-#include "service/fe-support.h"
 #include "testutil/cpu-util.h"
 #include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
@@ -41,8 +39,6 @@ using BufferHandle = BufferPool::BufferHandle;
 class BufferAllocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    test_env_.reset(new TestEnv);
-    ASSERT_OK(test_env_->Init());
     dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
     dummy_reservation_.InitRootTracker(nullptr, 0);
     ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
@@ -63,8 +59,6 @@ class BufferAllocatorTest : public ::testing::Test {
   /// The minimum buffer size used in most tests.
   const static int64_t TEST_BUFFER_LEN = 1024;
 
-  boost::scoped_ptr<TestEnv> test_env_;
-
   ObjectPool obj_pool_;
 
   /// Need a dummy pool and client to pass around. We bypass the reservation mechanisms
@@ -206,7 +200,6 @@ TEST_F(SystemAllocatorTest, LargeAllocFailure) {
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
   int result = 0;
   for (bool mmap : {false, true}) {
     for (bool madvise : {false, true}) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d38b5c/be/src/runtime/bufferpool/free-list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/free-list-test.cc b/be/src/runtime/bufferpool/free-list-test.cc
index d3c4c9a..7cb80b4 100644
--- a/be/src/runtime/bufferpool/free-list-test.cc
+++ b/be/src/runtime/bufferpool/free-list-test.cc
@@ -21,8 +21,6 @@
 #include "common/object-pool.h"
 #include "runtime/bufferpool/free-list.h"
 #include "runtime/bufferpool/system-allocator.h"
-#include "runtime/test-env.h"
-#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
 
@@ -33,8 +31,6 @@ namespace impala {
 class FreeListTest : public ::testing::Test {
  protected:
   virtual void SetUp() override {
-    test_env_.reset(new TestEnv);
-    ASSERT_OK(test_env_->Init());
     allocator_ = obj_pool_.Add(new SystemAllocator(MIN_BUFFER_LEN));
     RandTestUtil::SeedRng("FREE_LIST_TEST_SEED", &rng_);
   }
@@ -75,8 +71,6 @@ class FreeListTest : public ::testing::Test {
 
   const static int MIN_BUFFER_LEN = 1024;
 
-  boost::scoped_ptr<TestEnv> test_env_;
-
   /// Per-test random number generator. Seeded before every test.
   std::mt19937 rng_;
 
@@ -163,9 +157,4 @@ TEST_F(FreeListTest, ReturnOrder) {
 }
 }
 
-int main(int argc, char** argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-  return RUN_ALL_TESTS();
-}
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d38b5c/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 470b065..6cd53fb 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -27,8 +27,6 @@
 #include "common/object-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/suballocator.h"
-#include "runtime/test-env.h"
-#include "service/fe-support.h"
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
@@ -46,8 +44,6 @@ namespace impala {
 class SuballocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() override {
-    test_env_.reset(new TestEnv);
-    ASSERT_OK(test_env_->Init());
     RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_);
     profile_ = RuntimeProfile::Create(&obj_pool_, "test profile");
   }
@@ -115,8 +111,6 @@ class SuballocatorTest : public ::testing::Test {
   /// Clients for the buffer pool. Deregistered and freed after every test.
   vector<unique_ptr<BufferPool::ClientHandle>> clients_;
 
-  boost::scoped_ptr<TestEnv> test_env_;
-
   /// Global profile - recreated for every test.
   RuntimeProfile* profile_;
 
@@ -368,9 +362,4 @@ void SuballocatorTest::AssertMemoryValid(
 }
 }
 
-int main(int argc, char** argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-  return RUN_ALL_TESTS();
-}
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d38b5c/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 94ca834..6d9a857 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -271,6 +271,16 @@ Status ExecEnv::Init() {
           "bytes value or percentage: $0", FLAGS_mem_limit));
   }
 
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+  // Aggressive decommit is required so that unused pages in the TCMalloc page heap are
+  // not backed by physical pages and do not contribute towards memory consumption.
+  // Enable it in TCMalloc before InitBufferPool().
+  if (!MallocExtension::instance()->SetNumericProperty(
+          "tcmalloc.aggressive_memory_decommit", 1)) {
+    return Status("Failed to enable TCMalloc aggressive decommit.");
+  }
+#endif
+
   if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
     return Status(Substitute(
         "--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
@@ -310,11 +320,6 @@ Status ExecEnv::Init() {
         FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
     RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
         FLAGS_datastream_service_queue_depth, move(data_svc)));
-    // Bump thread cache to 1GB to reduce contention for TCMalloc central
-    // list's spinlock.
-    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
-      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
-    }
   }
 
   mem_tracker_.reset(
@@ -427,12 +432,6 @@ Status ExecEnv::StartKrpcService() {
 
 void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
     int64_t clean_pages_limit) {
-#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
-  // Aggressive decommit is required so that unused pages in the TCMalloc page heap are
-  // not backed by physical pages and do not contribute towards memory consumption.
-  MallocExtension::instance()->SetNumericProperty(
-      "tcmalloc.aggressive_memory_decommit", 1);
-#endif
   buffer_pool_.reset(new BufferPool(min_buffer_size, capacity, clean_pages_limit));
   buffer_reservation_.reset(new ReservationTracker());
   buffer_reservation_->InitRootTracker(nullptr, capacity);

http://git-wip-us.apache.org/repos/asf/impala/blob/b3d38b5c/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index f5b67a9..058dc01 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,7 +72,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=4-7847dc86c4
+export IMPALA_TOOLCHAIN_BUILD_ID=482-c2361403fc
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -99,7 +99,7 @@ export IMPALA_GFLAGS_VERSION=2.2.0-p1
 unset IMPALA_GFLAGS_URL
 export IMPALA_GLOG_VERSION=0.3.4-p2
 unset IMPALA_GLOG_URL
-export IMPALA_GPERFTOOLS_VERSION=2.6.3
+export IMPALA_GPERFTOOLS_VERSION=2.5
 unset IMPALA_GPERFTOOLS_URL
 export IMPALA_GTEST_VERSION=1.6.0
 unset IMPALA_GTEST_URL