You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/08/15 16:30:32 UTC

[kudu] branch master updated: [metrics] Merge metrics by the same attribute

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fe6e5cc  [metrics] Merge metrics by the same attribute
fe6e5cc is described below

commit fe6e5cc0c9c1573de174d1ce7838b449373ae36e
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Tue Jun 11 20:40:12 2019 +0800

    [metrics] Merge metrics by the same attribute
    
    This patch merge metrics together which have the same value of some
    attributes, in order to reduce the total data size received by any
    thirdparty monitor systems if they do not care about the original
    metrics details.
    For example, fetch metrics from tserver by:
    http://<host>:<port>/metrics?merge_rules=tablet|table|table_name
    
    All 'tablet' type metrics which have the same value of 'table_name'
    attribute, will be merged together into a new 'table' type metrics,
    and metric values will be aggregated.
    
    Change-Id: I8db3d082ae847eb1d83b9e4aee57d5e4bf13e1b5
    Reviewed-on: http://gerrit.cloudera.org:8080/13580
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/server/default_path_handlers.cc |  23 +-
 src/kudu/util/hdr_histogram-test.cc      |  28 +++
 src/kudu/util/hdr_histogram.cc           |  61 +++--
 src/kudu/util/hdr_histogram.h            |   7 +
 src/kudu/util/metrics-test.cc            | 377 +++++++++++++++++++++++++++++--
 src/kudu/util/metrics.cc                 | 241 ++++++++++++++++----
 src/kudu/util/metrics.h                  | 313 ++++++++++++++++++-------
 7 files changed, 874 insertions(+), 176 deletions(-)

diff --git a/src/kudu/server/default_path_handlers.cc b/src/kudu/server/default_path_handlers.cc
index 6a725fb..0397144 100644
--- a/src/kudu/server/default_path_handlers.cc
+++ b/src/kudu/server/default_path_handlers.cc
@@ -343,17 +343,30 @@ static void WriteMetricsAsJson(const MetricRegistry* const metrics,
   MetricJsonOptions opts;
   opts.include_raw_histograms = ParseBool(req.parsed_args, "include_raw_histograms");
   opts.include_schema_info = ParseBool(req.parsed_args, "include_schema");
-  opts.entity_types = ParseArray(req.parsed_args, "types");
-  opts.entity_ids =  ParseArray(req.parsed_args, "ids");
-  opts.entity_attrs = ParseArray(req.parsed_args, "attributes");
-  opts.entity_metrics = ParseArray(req.parsed_args, "metrics");
+
+  MetricFilters& filters = opts.filters;
+  filters.entity_types = ParseArray(req.parsed_args, "types");
+  filters.entity_ids =  ParseArray(req.parsed_args, "ids");
+  filters.entity_attrs = ParseArray(req.parsed_args, "attributes");
+  filters.entity_metrics = ParseArray(req.parsed_args, "metrics");
+  vector<string> merge_rules = ParseArray(req.parsed_args, "merge_rules");
+  for (const auto& merge_rule : merge_rules) {
+    vector<string> values;
+    SplitStringUsing(merge_rule, "|", &values);
+    if (values.size() == 3) {
+      // Index 0: entity type needed to be merged.
+      // Index 1: 'merge_to' field of MergeAttributes.
+      // Index 2: 'attribute_to_merge_by' field of MergeAttributes.
+      EmplaceIfNotPresent(&opts.merge_rules, values[0], MergeAttributes(values[1], values[2]));
+    }
+  }
 
   JsonWriter::Mode json_mode = ParseBool(req.parsed_args, "compact") ?
       JsonWriter::COMPACT : JsonWriter::PRETTY;
 
   // The number of entity_attrs should always be even because
   // each pair represents a key and a value.
-  if (opts.entity_attrs.size() % 2 != 0) {
+  if (filters.entity_attrs.size() % 2 != 0) {
     resp->status_code = HttpStatusCode::BadRequest;
     WARN_NOT_OK(Status::InvalidArgument(""), "The parameter of 'attributes' is wrong");
   } else {
diff --git a/src/kudu/util/hdr_histogram-test.cc b/src/kudu/util/hdr_histogram-test.cc
index 5d51e98..22a9ab6 100644
--- a/src/kudu/util/hdr_histogram-test.cc
+++ b/src/kudu/util/hdr_histogram-test.cc
@@ -113,4 +113,32 @@ TEST_F(HdrHistogramTest, PercentileAndCopyTest) {
   ASSERT_EQ(hist.TotalSum(), copy.TotalSum());
 }
 
+void PopulateHistogram(HdrHistogram* histogram, uint64_t low, uint64_t high) {
+  for (uint64_t i = low; i <= high; i++) {
+    histogram->Increment(i);
+  }
+}
+
+TEST_F(HdrHistogramTest, MergeTest) {
+  uint64_t highest_val = 10000LU;
+
+  HdrHistogram hist(highest_val, kSigDigits);
+  HdrHistogram other(highest_val, kSigDigits);
+
+  PopulateHistogram(&hist, 1, 100);
+  PopulateHistogram(&other, 101, 250);
+  HdrHistogram old(hist);
+  hist.MergeFrom(other);
+
+  ASSERT_EQ(hist.TotalCount(), old.TotalCount() + other.TotalCount());
+  ASSERT_EQ(hist.TotalSum(), old.TotalSum() + other.TotalSum());
+  ASSERT_EQ(hist.MinValue(), 1);
+  ASSERT_EQ(hist.MaxValue(), 250);
+  ASSERT_NEAR(hist.MeanValue(), (1 + 250) / 2.0, 1e3);
+  ASSERT_EQ(hist.ValueAtPercentile(100.0), 250);
+  ASSERT_NEAR(hist.ValueAtPercentile(99.0), 250 * 99.0 / 100, 1e3);
+  ASSERT_NEAR(hist.ValueAtPercentile(95.0), 250 * 95.0 / 100, 1e3);
+  ASSERT_NEAR(hist.ValueAtPercentile(50.0), 250 * 50.0 / 100, 1e3);
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/hdr_histogram.cc b/src/kudu/util/hdr_histogram.cc
index 7e1f1ff..958d89c 100644
--- a/src/kudu/util/hdr_histogram.cc
+++ b/src/kudu/util/hdr_histogram.cc
@@ -155,6 +155,26 @@ void HdrHistogram::Increment(int64_t value) {
   IncrementBy(value, 1);
 }
 
+void HdrHistogram::UpdateMinMax(int64_t min, int64_t max) {
+  // Update min, if needed.
+  {
+    Atomic64 min_val;
+    while (PREDICT_FALSE(min < (min_val = MinValue()))) {
+      Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, min);
+      if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
+    }
+  }
+
+  // Update max, if needed.
+  {
+    Atomic64 max_val;
+    while (PREDICT_FALSE(max > (max_val = MaxValue()))) {
+      Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, max);
+      if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
+    }
+  }
+}
+
 void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
   DCHECK_GE(value, 0);
   DCHECK_GE(count, 0);
@@ -170,23 +190,7 @@ void HdrHistogram::IncrementBy(int64_t value, int64_t count) {
   NoBarrier_AtomicIncrement(&total_count_, count);
   NoBarrier_AtomicIncrement(&total_sum_, value * count);
 
-  // Update min, if needed.
-  {
-    Atomic64 min_val;
-    while (PREDICT_FALSE(value < (min_val = MinValue()))) {
-      Atomic64 old_val = NoBarrier_CompareAndSwap(&min_value_, min_val, value);
-      if (PREDICT_TRUE(old_val == min_val)) break; // CAS success.
-    }
-  }
-
-  // Update max, if needed.
-  {
-    Atomic64 max_val;
-    while (PREDICT_FALSE(value > (max_val = MaxValue()))) {
-      Atomic64 old_val = NoBarrier_CompareAndSwap(&max_value_, max_val, value);
-      if (PREDICT_TRUE(old_val == max_val)) break; // CAS success.
-    }
-  }
+  UpdateMinMax(value, value);
 }
 
 void HdrHistogram::IncrementWithExpectedInterval(int64_t value,
@@ -343,6 +347,29 @@ void HdrHistogram::DumpHumanReadable(std::ostream* out) const {
   }
 }
 
+void HdrHistogram::MergeFrom(const HdrHistogram& other) {
+  DCHECK_EQ(highest_trackable_value_, other.highest_trackable_value());
+  DCHECK_EQ(num_significant_digits_, other.num_significant_digits());
+  DCHECK_EQ(counts_array_length_, other.counts_array_length_);
+  DCHECK_EQ(bucket_count_, other.bucket_count_);
+  DCHECK_EQ(sub_bucket_count_, other.sub_bucket_count_);
+  DCHECK_EQ(sub_bucket_half_count_magnitude_, other.sub_bucket_half_count_magnitude_);
+  DCHECK_EQ(sub_bucket_half_count_, other.sub_bucket_half_count_);
+  DCHECK_EQ(sub_bucket_mask_, other.sub_bucket_mask_);
+
+  NoBarrier_AtomicIncrement(&total_count_, other.total_count_);
+  NoBarrier_AtomicIncrement(&total_sum_, other.total_sum_);
+
+  UpdateMinMax(other.min_value_, other.max_value_);
+
+  for (int i = 0; i < counts_array_length_; i++) {
+    Atomic64 count = NoBarrier_Load(&other.counts_[i]);
+    if (count > 0) {
+      NoBarrier_AtomicIncrement(&counts_[i], count);
+    }
+  }
+}
+
 ///////////////////////////////////////////////////////////////////////
 // AbstractHistogramIterator
 ///////////////////////////////////////////////////////////////////////
diff --git a/src/kudu/util/hdr_histogram.h b/src/kudu/util/hdr_histogram.h
index 934875b..fa139e2 100644
--- a/src/kudu/util/hdr_histogram.h
+++ b/src/kudu/util/hdr_histogram.h
@@ -170,6 +170,12 @@ class HdrHistogram {
 
   // Dump a formatted, multiline string describing this histogram to 'out'.
   void DumpHumanReadable(std::ostream* out) const;
+
+  // Merges 'other' into this HdrHistogram. Values in each 'counts_' array
+  // bucket will be added up, and the related 'min_value_', 'max_value_',
+  // 'total_count_' and 'total_sum_' will be updated if needed.
+  void MergeFrom(const HdrHistogram& other);
+
  private:
   friend class AbstractHistogramIterator;
 
@@ -179,6 +185,7 @@ class HdrHistogram {
 
   void Init();
   int CountsArrayIndex(int bucket_index, int sub_bucket_index) const;
+  void UpdateMinMax(int64_t min, int64_t max);
 
   uint64_t highest_trackable_value_;
   int num_significant_digits_;
diff --git a/src/kudu/util/metrics-test.cc b/src/kudu/util/metrics-test.cc
index ea2e8ca..4b23214 100644
--- a/src/kudu/util/metrics-test.cc
+++ b/src/kudu/util/metrics-test.cc
@@ -18,10 +18,13 @@
 #include "kudu/util/metrics.h"
 
 #include <cstdint>
+#include <map>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -31,7 +34,7 @@
 
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/hdr_histogram.h"
@@ -60,12 +63,23 @@ class MetricsTest : public KuduTest {
   void SetUp() override {
     KuduTest::SetUp();
 
-    entity_ = METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test");
+    entity_ = METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test-same-attr1");
+    entity_->SetAttribute("attr_for_merge", "same_attr");
+
+    entity_same_attr_ = METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test-same-attr2");
+    entity_same_attr_->SetAttribute("attr_for_merge", "same_attr");
+
+    entity_diff_attr_ = METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test-diff-attr");
+    entity_diff_attr_->SetAttribute("attr_for_merge", "diff_attr");
   }
 
  protected:
+  const int kEntityCount = 3;
+
   MetricRegistry registry_;
   scoped_refptr<MetricEntity> entity_;
+  scoped_refptr<MetricEntity> entity_same_attr_;
+  scoped_refptr<MetricEntity> entity_diff_attr_;
 };
 
 METRIC_DEFINE_counter(test_entity, test_counter, "My Test Counter", MetricUnit::kRequests,
@@ -82,6 +96,78 @@ TEST_F(MetricsTest, SimpleCounterTest) {
   ASSERT_EQ(3, requests->value());
 }
 
+TEST_F(MetricsTest, SimpleCounterMergeTest) {
+  scoped_refptr<Counter> requests =
+    new Counter(&METRIC_test_counter);
+  scoped_refptr<Counter> requests_for_merge =
+    new Counter(&METRIC_test_counter);
+  requests->IncrementBy(2);
+  requests_for_merge->IncrementBy(3);
+  ASSERT_TRUE(requests_for_merge->MergeFrom(requests));
+  ASSERT_EQ(2, requests->value());
+  ASSERT_EQ(5, requests_for_merge->value());
+  requests->IncrementBy(7);
+  ASSERT_TRUE(requests_for_merge->MergeFrom(requests));
+  ASSERT_EQ(9, requests->value());
+  ASSERT_EQ(14, requests_for_merge->value());
+  ASSERT_TRUE(requests_for_merge->MergeFrom(requests_for_merge));
+  ASSERT_EQ(14, requests_for_merge->value());
+}
+
+METRIC_DEFINE_gauge_string(test_entity, test_string_gauge, "Test string Gauge",
+                           MetricUnit::kState, "Description of string Gauge");
+
+TEST_F(MetricsTest, SimpleStringGaugeTest) {
+  scoped_refptr<StringGauge> state =
+    new StringGauge(&METRIC_test_string_gauge, "Healthy");
+  ASSERT_EQ(METRIC_test_string_gauge.description(), state->prototype()->description());
+  ASSERT_EQ("Healthy", state->value());
+  state->set_value("Under-replicated");
+  ASSERT_EQ("Under-replicated", state->value());
+  state->set_value("Recovering");
+  ASSERT_EQ("Recovering", state->value());
+}
+
+TEST_F(MetricsTest, SimpleStringGaugeForMergeTest) {
+  scoped_refptr<StringGauge> state =
+    new StringGauge(&METRIC_test_string_gauge, "Healthy");
+  scoped_refptr<StringGauge> state_for_merge =
+    new StringGauge(&METRIC_test_string_gauge, "Recovering");
+  ASSERT_TRUE(state_for_merge->MergeFrom(state));
+  ASSERT_EQ("Healthy", state->value());
+  ASSERT_EQ(unordered_set<string>({"Recovering", "Healthy"}),
+            state_for_merge->unique_values());
+
+  scoped_refptr<StringGauge> state_old =
+      down_cast<StringGauge*>(state->snapshot().get());
+  ASSERT_EQ("Healthy", state_old->value());
+  scoped_refptr<StringGauge> state_for_merge_old =
+      down_cast<StringGauge*>(state_for_merge->snapshot().get());
+  ASSERT_EQ(unordered_set<string>({"Recovering", "Healthy"}),
+            state_for_merge_old->unique_values());
+
+  state_old->set_value("Unavailable");
+  ASSERT_EQ("Unavailable", state_old->value());
+  ASSERT_TRUE(state_for_merge_old->MergeFrom(state_old));
+  ASSERT_EQ(unordered_set<string>({"Unavailable", "Recovering", "Healthy"}),
+            state_for_merge_old->unique_values());
+
+  state->set_value("Under-replicated");
+  ASSERT_TRUE(state_for_merge->MergeFrom(state));
+  ASSERT_EQ(unordered_set<string>({"Under-replicated", "Healthy", "Recovering"}),
+            state_for_merge->unique_values());
+
+  ASSERT_TRUE(state_for_merge->MergeFrom(state_for_merge_old));
+  ASSERT_EQ(unordered_set<string>({"Unavailable", "Healthy", "Recovering"}),
+            state_for_merge_old->unique_values());
+  ASSERT_EQ(unordered_set<string>({"Unavailable", "Under-replicated", "Healthy", "Recovering"}),
+            state_for_merge->unique_values());
+
+  ASSERT_TRUE(state_for_merge->MergeFrom(state_for_merge));
+  ASSERT_EQ(unordered_set<string>({"Unavailable", "Under-replicated", "Healthy", "Recovering"}),
+            state_for_merge->unique_values());
+}
+
 METRIC_DEFINE_gauge_uint64(test_entity, test_gauge, "Test uint64 Gauge",
                            MetricUnit::kBytes, "Description of Test Gauge");
 
@@ -96,6 +182,22 @@ TEST_F(MetricsTest, SimpleAtomicGaugeTest) {
   ASSERT_EQ(5, mem_usage->value());
 }
 
+TEST_F(MetricsTest, SimpleAtomicGaugeMergeTest) {
+  scoped_refptr<AtomicGauge<uint64_t> > mem_usage =
+    METRIC_test_gauge.Instantiate(entity_, 2);
+  scoped_refptr<AtomicGauge<uint64_t> > mem_usage_for_merge =
+    METRIC_test_gauge.Instantiate(entity_same_attr_, 3);
+  ASSERT_TRUE(mem_usage_for_merge->MergeFrom(mem_usage));
+  ASSERT_EQ(2, mem_usage->value());
+  ASSERT_EQ(5, mem_usage_for_merge->value());
+  mem_usage->IncrementBy(7);
+  ASSERT_TRUE(mem_usage_for_merge->MergeFrom(mem_usage));
+  ASSERT_EQ(9, mem_usage->value());
+  ASSERT_EQ(14, mem_usage_for_merge->value());
+  ASSERT_TRUE(mem_usage_for_merge->MergeFrom(mem_usage_for_merge));
+  ASSERT_EQ(14, mem_usage_for_merge->value());
+}
+
 METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Function Gauge",
                           MetricUnit::kBytes, "Test Gauge 2");
 
@@ -122,6 +224,78 @@ TEST_F(MetricsTest, SimpleFunctionGaugeTest) {
   ASSERT_EQ(2, gauge->value());
 }
 
+METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge_snapshot, "Test Function Gauge snapshot",
+                          MetricUnit::kOperations, "Description of Function Gauge snapshot");
+class FunctionGaugeOwner {
+ public:
+  explicit FunctionGaugeOwner(const scoped_refptr<MetricEntity>& entity) {
+    METRIC_test_func_gauge_snapshot.InstantiateFunctionGauge(entity,
+       Bind(&FunctionGaugeOwner::Count, Unretained(this)))
+       ->AutoDetach(&metric_detacher_);
+  }
+
+  int64_t Count() {
+    return count_++;
+  }
+
+ private:
+  int64_t count_ = 0;
+  FunctionGaugeDetacher metric_detacher_;
+};
+
+TEST_F(MetricsTest, SimpleFunctionGaugeSnapshotTest) {
+  std::unique_ptr<FunctionGaugeOwner> fgo(new FunctionGaugeOwner(entity_));
+  scoped_refptr<FunctionGauge<int64_t>> old_metric = down_cast<FunctionGauge<int64_t>*>(
+    entity_->FindOrNull(METRIC_test_func_gauge_snapshot).get());
+  ASSERT_EQ(0, old_metric->value());
+  ASSERT_EQ(1, old_metric->value());  // old_metric increased to 2.
+  // old_metric increased to 3, and new_metric stay at 2.
+  scoped_refptr<FunctionGauge<int64_t>> new_metric =
+      down_cast<FunctionGauge<int64_t>*>(old_metric->snapshot().get());
+  ASSERT_EQ(3, old_metric->value());
+  ASSERT_EQ(4, old_metric->value());
+  ASSERT_EQ(2, new_metric->value());
+  ASSERT_EQ(2, new_metric->value());
+}
+
+TEST_F(MetricsTest, ReleasableFunctionGaugeSnapshotTest) {
+  scoped_refptr<FunctionGauge<int64_t>> new_metric;
+  {
+    std::unique_ptr<FunctionGaugeOwner> fgo(new FunctionGaugeOwner(entity_));
+    scoped_refptr<FunctionGauge<int64_t>> old_metric = down_cast<FunctionGauge<int64_t>*>(
+      entity_->FindOrNull(METRIC_test_func_gauge_snapshot).get());
+    ASSERT_EQ(0, old_metric->value());
+    ASSERT_EQ(1, old_metric->value());  // old_metric increased to 2.
+    // old_metric increased to 3, and new_metric stay at 2.
+    new_metric = down_cast<FunctionGauge<int64_t>*>(old_metric->snapshot().get());
+  }
+  ASSERT_EQ(2, new_metric->value());
+  ASSERT_EQ(2, new_metric->value());
+}
+
+TEST_F(MetricsTest, SimpleFunctionGaugeMergeTest) {
+  int metric_val = 1000;
+  scoped_refptr<FunctionGauge<int64_t> > gauge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+      entity_, Bind(&MyFunction, Unretained(&metric_val)));
+
+  int metric_val_for_merge = 1234;
+  scoped_refptr<FunctionGauge<int64_t> > gauge_for_merge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+      entity_same_attr_, Bind(&MyFunction, Unretained(&metric_val_for_merge)));
+
+  ASSERT_TRUE(gauge_for_merge->MergeFrom(gauge));
+  ASSERT_EQ(1001, gauge->value());
+  ASSERT_EQ(1002, gauge->value());
+  ASSERT_EQ(2234, gauge_for_merge->value());
+  ASSERT_EQ(2234, gauge_for_merge->value());
+  ASSERT_TRUE(gauge_for_merge->MergeFrom(gauge));
+  ASSERT_EQ(3237, gauge_for_merge->value());
+  ASSERT_EQ(3237, gauge_for_merge->value());
+  ASSERT_TRUE(gauge_for_merge->MergeFrom(gauge_for_merge));
+  ASSERT_EQ(3237, gauge_for_merge->value());
+}
+
 TEST_F(MetricsTest, AutoDetachToLastValue) {
   int metric_val = 1000;
   scoped_refptr<FunctionGauge<int64_t> > gauge =
@@ -173,12 +347,40 @@ TEST_F(MetricsTest, SimpleHistogramTest) {
   scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_);
   hist->Increment(2);
   hist->IncrementBy(4, 1);
-  ASSERT_EQ(2, hist->histogram_->MinValue());
-  ASSERT_EQ(3, hist->histogram_->MeanValue());
-  ASSERT_EQ(4, hist->histogram_->MaxValue());
-  ASSERT_EQ(2, hist->histogram_->TotalCount());
-  ASSERT_EQ(6, hist->histogram_->TotalSum());
-  // TODO: Test coverage needs to be improved a lot.
+  ASSERT_EQ(2, hist->histogram()->MinValue());
+  ASSERT_EQ(3, hist->histogram()->MeanValue());
+  ASSERT_EQ(4, hist->histogram()->MaxValue());
+  ASSERT_EQ(2, hist->histogram()->TotalCount());
+  ASSERT_EQ(6, hist->histogram()->TotalSum());
+  // TODO(someone): Test coverage needs to be improved a lot.
+}
+
+TEST_F(MetricsTest, SimpleHistogramMergeTest) {
+  scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_);
+  scoped_refptr<Histogram> hist_for_merge = METRIC_test_hist.Instantiate(entity_same_attr_);
+  hist->Increment(2);
+  hist->IncrementBy(6, 1);
+  hist_for_merge->Increment(1);
+  hist_for_merge->IncrementBy(3, 3);
+  ASSERT_TRUE(hist_for_merge->MergeFrom(hist));
+  ASSERT_EQ(2, hist->histogram()->MinValue());
+  ASSERT_EQ(4, hist->histogram()->MeanValue());
+  ASSERT_EQ(6, hist->histogram()->MaxValue());
+  ASSERT_EQ(2, hist->histogram()->TotalCount());
+  ASSERT_EQ(8, hist->histogram()->TotalSum());
+  ASSERT_EQ(1, hist_for_merge->histogram()->MinValue());
+  ASSERT_EQ(3, hist_for_merge->histogram()->MeanValue());
+  ASSERT_EQ(6, hist_for_merge->histogram()->MaxValue());
+  ASSERT_EQ(6, hist_for_merge->histogram()->TotalCount());
+  ASSERT_EQ(18, hist_for_merge->histogram()->TotalSum());
+  ASSERT_EQ(1, hist_for_merge->histogram()->ValueAtPercentile(20.0));
+  ASSERT_EQ(2, hist_for_merge->histogram()->ValueAtPercentile(30.0));
+  ASSERT_EQ(3, hist_for_merge->histogram()->ValueAtPercentile(50.0));
+  ASSERT_EQ(3, hist_for_merge->histogram()->ValueAtPercentile(90.0));
+  ASSERT_EQ(6, hist_for_merge->histogram()->ValueAtPercentile(100.0));
+  ASSERT_TRUE(hist_for_merge->MergeFrom(hist_for_merge));
+  ASSERT_EQ(6, hist_for_merge->histogram()->TotalCount());
+  ASSERT_EQ(18, hist_for_merge->histogram()->TotalSum());
 }
 
 TEST_F(MetricsTest, JsonPrintTest) {
@@ -228,7 +430,7 @@ TEST_F(MetricsTest, JsonPrintTest) {
     out.str("");
     JsonWriter writer(&out, JsonWriter::PRETTY);
     MetricJsonOptions opts;
-    opts.entity_metrics.emplace_back("test_count");
+    opts.filters.entity_metrics.emplace_back("test_count");
     ASSERT_OK(entity_->WriteAsJson(&writer, opts));
     ASSERT_STR_CONTAINS(out.str(), METRIC_test_counter.name());
     ASSERT_STR_NOT_CONTAINS(out.str(), METRIC_test_gauge.name());
@@ -239,7 +441,7 @@ TEST_F(MetricsTest, JsonPrintTest) {
     out.str("");
     JsonWriter writer(&out, JsonWriter::PRETTY);
     MetricJsonOptions opts;
-    opts.entity_metrics.emplace_back("not_a_matching_metric");
+    opts.filters.entity_metrics.emplace_back("not_a_matching_metric");
     ASSERT_OK(entity_->WriteAsJson(&writer, opts));
     ASSERT_EQ(out.str(), "");
   }
@@ -249,13 +451,142 @@ TEST_F(MetricsTest, JsonPrintTest) {
     out.str("");
     JsonWriter writer(&out, JsonWriter::PRETTY);
     MetricJsonOptions opts;
-    opts.entity_metrics.emplace_back("teST_coUNteR");
+    opts.filters.entity_metrics.emplace_back("teST_coUNteR");
     ASSERT_OK(entity_->WriteAsJson(&writer, opts));
     ASSERT_STR_CONTAINS(out.str(), METRIC_test_counter.name());
     ASSERT_STR_NOT_CONTAINS(out.str(), METRIC_test_gauge.name());
   }
 }
 
+// Check JSON style 'output' whether match 'expect_counters'.
+void CheckMergeOutput(const std::ostringstream& output,
+                      std::map<std::string, int> expect_counters) {
+  JsonReader reader(output.str());
+  ASSERT_OK(reader.Init());
+
+  vector<const rapidjson::Value*> counters;
+  ASSERT_OK(reader.ExtractObjectArray(reader.root(), nullptr, &counters));
+  ASSERT_EQ(expect_counters.size(), counters.size());
+  for (const auto& counter : counters) {
+    string type;
+    ASSERT_OK(reader.ExtractString(counter, "type", &type));
+    ASSERT_EQ("merged_entity", type);
+    string id;
+    ASSERT_OK(reader.ExtractString(counter, "id", &id));
+    auto it = expect_counters.find(id);
+    ASSERT_NE(it, expect_counters.end());
+    vector<const rapidjson::Value*> metrics;
+    ASSERT_OK(reader.ExtractObjectArray(counter, "metrics", &metrics));
+    string metric_name;
+    ASSERT_OK(reader.ExtractString(metrics[0], "name", &metric_name));
+    ASSERT_EQ("test_counter", metric_name);
+    int64_t metric_value;
+    ASSERT_OK(reader.ExtractInt64(metrics[0], "value", &metric_value));
+    ASSERT_EQ(it->second, metric_value);
+    expect_counters.erase(it);
+  }
+  ASSERT_TRUE(expect_counters.empty());
+}
+
+TEST_F(MetricsTest, CollectTest) {
+  scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_);
+  test_counter->Increment();
+
+  scoped_refptr<Counter> test_counter_same_attr =
+      METRIC_test_counter.Instantiate(entity_same_attr_);
+  test_counter_same_attr->IncrementBy(10);
+
+  scoped_refptr<Counter> test_counter_diff_attr =
+      METRIC_test_counter.Instantiate(entity_diff_attr_);
+  test_counter_diff_attr->IncrementBy(100);
+
+  MetricJsonOptions base_opts;
+  base_opts.merge_rules.emplace(
+      "test_entity",
+      MergeAttributes("merged_entity", "attr_for_merge"));
+
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    ASSERT_OK(registry_.WriteAsJson(&writer, base_opts));
+    NO_FATALS(CheckMergeOutput(out, {{"same_attr", 11},
+                                     {"diff_attr", 100}}));
+  }
+
+  // Verify that metric filtering matches on substrings.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_metrics.emplace_back("counter");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {{"same_attr", 11},
+                                     {"diff_attr", 100}}));
+  }
+
+  // Verify that, if we filter for a metric that isn't in this entity, we get no result.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_metrics.emplace_back("not_a_matching_metric");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {}));
+  }
+
+  // Verify that metric entity id filtering matches on substrings.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_ids.emplace_back("my-test-same-attr");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {{"same_attr", 11}}));
+  }
+
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_ids.emplace_back("my-test-same-attr2");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {{"same_attr", 10}}));
+  }
+
+  // Verify that, if we filter for a metric entity id that doesn't match any entity,
+  // we get no result.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_ids.emplace_back("not_a_matching_metric_entity_id");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {}));
+  }
+
+  // Verify that some attribute filtering matches on substrings.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_attrs.emplace_back("attr_for_merge");
+    opts.filters.entity_attrs.emplace_back("same_attr");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {{"same_attr", 11}}));
+  }
+
+  // Verify that, if we filter for an attribute that doesn't match any entity, we get no result.
+  {
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::PRETTY);
+    MetricJsonOptions opts(base_opts);
+    opts.filters.entity_attrs.emplace_back("attr_for_merge");
+    opts.filters.entity_attrs.emplace_back("not_a_matching_attr");
+    ASSERT_OK(registry_.WriteAsJson(&writer, opts));
+    NO_FATALS(CheckMergeOutput(out, {}));
+  }
+}
+
 // Test that metrics are retired when they are no longer referenced.
 TEST_F(MetricsTest, RetirementTest) {
   FLAGS_metrics_retirement_age_ms = 100;
@@ -285,10 +616,12 @@ TEST_F(MetricsTest, RetirementTest) {
 }
 
 TEST_F(MetricsTest, TestRetiringEntities) {
-  ASSERT_EQ(1, registry_.num_entities());
+  ASSERT_EQ(kEntityCount, registry_.num_entities());
 
   // Drop the reference to our entity.
   entity_.reset();
+  entity_same_attr_.reset();
+  entity_diff_attr_.reset();
 
   // Retire metrics. Since there is nothing inside our entity, it should
   // retire immediately (no need to loop).
@@ -456,7 +789,7 @@ TEST_F(MetricsTest, TestFilter) {
     {
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_types = { not_exist_string };
+      opts.filters.entity_types = { not_exist_string };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       ASSERT_EQ("[]", out.str());
@@ -465,12 +798,12 @@ TEST_F(MetricsTest, TestFilter) {
       const string entity_type = "test_entity";
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_types = { entity_type };
+      opts.filters.entity_types = { entity_type };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       rapidjson::Document d;
       d.Parse<0>(out.str().c_str());
-      ASSERT_EQ(kNum + 1, d.Size());
+      ASSERT_EQ(kNum + kEntityCount, d.Size());
       ASSERT_EQ(entity_type, d[rand.Next() % kNum]["type"].GetString());
     }
   }
@@ -479,7 +812,7 @@ TEST_F(MetricsTest, TestFilter) {
     {
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_ids = { not_exist_string };
+      opts.filters.entity_ids = { not_exist_string };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       ASSERT_EQ("[]", out.str());
@@ -488,7 +821,7 @@ TEST_F(MetricsTest, TestFilter) {
       const string& entity_id = id_uuids[rand.Next() % kNum];
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_ids = { entity_id };
+      opts.filters.entity_ids = { entity_id };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       rapidjson::Document d;
@@ -502,7 +835,7 @@ TEST_F(MetricsTest, TestFilter) {
     {
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_attrs = { attr1, not_exist_string };
+      opts.filters.entity_attrs = { attr1, not_exist_string };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       ASSERT_EQ("[]", out.str());
@@ -513,7 +846,7 @@ TEST_F(MetricsTest, TestFilter) {
       const string& attr2_uuid = attr2_uuids[i];
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_attrs = { attr1, attr1_uuid };
+      opts.filters.entity_attrs = { attr1, attr1_uuid };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       rapidjson::Document d;
@@ -528,7 +861,7 @@ TEST_F(MetricsTest, TestFilter) {
     {
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_metrics = { not_exist_string };
+      opts.filters.entity_metrics = { not_exist_string };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       ASSERT_EQ("[]", out.str());
@@ -538,7 +871,7 @@ TEST_F(MetricsTest, TestFilter) {
       const string entity_metric2 = "test_gauge";
       std::ostringstream out;
       MetricJsonOptions opts;
-      opts.entity_metrics = { entity_metric1 };
+      opts.filters.entity_metrics = { entity_metric1 };
       JsonWriter w(&out, JsonWriter::PRETTY);
       ASSERT_OK(registry_.WriteAsJson(&w, opts));
       ASSERT_STR_CONTAINS(out.str(), entity_metric1);
@@ -555,7 +888,7 @@ TEST_F(MetricsTest, TestFilter) {
     ASSERT_OK(registry_.WriteAsJson(&w, MetricJsonOptions()));
     rapidjson::Document d;
     d.Parse<0>(out.str().c_str());
-    ASSERT_EQ(kNum + 1, d.Size());
+    ASSERT_EQ(kNum + kEntityCount, d.Size());
   }
 }
 
diff --git a/src/kudu/util/metrics.cc b/src/kudu/util/metrics.cc
index 1741c11..4df2874 100644
--- a/src/kudu/util/metrics.cc
+++ b/src/kudu/util/metrics.cc
@@ -24,6 +24,7 @@
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/singleton.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/hdr_histogram.h"
@@ -44,9 +45,50 @@ METRIC_DEFINE_entity(server);
 namespace kudu {
 
 using std::string;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+template<typename Collection>
+void WriteMetricsToJson(JsonWriter* writer,
+                        const Collection& metrics,
+                        const MetricJsonOptions& opts) {
+  writer->String("metrics");
+  writer->StartArray();
+  for (const auto& val : metrics) {
+    const auto& m = val.second;
+    if (m->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) {
+      if (!opts.include_untouched_metrics && m->IsUntouched()) {
+        continue;
+      }
+      WARN_NOT_OK(m->WriteAsJson(writer, opts),
+                  Substitute("Failed to write $0 as JSON", val.first->name()));
+    }
+  }
+  writer->EndArray();
+}
+
+void WriteToJson(JsonWriter* writer,
+                 const MergedEntityMetrics &merged_entity_metrics,
+                 const MetricJsonOptions &opts) {
+  for (const auto& entity_metrics : merged_entity_metrics) {
+    if (entity_metrics.second.empty()) {
+      continue;
+    }
+    writer->StartObject();
+
+    writer->String("type");
+    writer->String(entity_metrics.first.type_);
+
+    writer->String("id");
+    writer->String(entity_metrics.first.id_);
+
+    WriteMetricsToJson(writer, entity_metrics.second, opts);
+
+    writer->EndObject();
+  }
+}
+
 //
 // MetricUnit
 //
@@ -151,7 +193,7 @@ MetricEntityPrototype::~MetricEntityPrototype() {
 
 scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate(
     MetricRegistry* registry,
-    const std::string& id,
+    const string& id,
     const MetricEntity::AttributeMap& initial_attrs) const {
   return registry->FindOrCreateEntity(this, id, initial_attrs);
 }
@@ -161,7 +203,7 @@ scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate(
 //
 
 MetricEntity::MetricEntity(const MetricEntityPrototype* prototype,
-                           std::string id, AttributeMap attributes)
+                           string id, AttributeMap attributes)
     : prototype_(prototype),
       id_(std::move(id)),
       attributes_(std::move(attributes)),
@@ -200,33 +242,37 @@ bool MatchNameInList(const string& name, const vector<string>& names) {
 
 } // anonymous namespace
 
-Status MetricEntity::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const {
+Status MetricEntity::GetMetricsAndAttrs(const MetricFilters& filters,
+                                        MetricMap* metrics,
+                                        AttributeMap* attrs) const {
+  CHECK(metrics);
+  CHECK(attrs);
+
   // Filter the 'type'.
-  if (!opts.entity_types.empty() && !MatchNameInList(prototype_->name(), opts.entity_types)) {
-    return Status::OK();
+  if (!filters.entity_types.empty() && !MatchNameInList(prototype_->name(), filters.entity_types)) {
+    return Status::NotFound("entity is filtered by entity type");
   }
+
   // Filter the 'id'.
-  if (!opts.entity_ids.empty() && !MatchNameInList(id_, opts.entity_ids)) {
-    return Status::OK();
+  if (!filters.entity_ids.empty() && !MatchNameInList(id_, filters.entity_ids)) {
+    return Status::NotFound("entity is filtered by entity id");
   }
 
-  MetricMap metrics;
-  AttributeMap attrs;
   {
     // Snapshot the metrics in this registry (not guaranteed to be a consistent snapshot)
     std::lock_guard<simple_spinlock> l(lock_);
-    attrs = attributes_;
-    metrics = metric_map_;
+    *attrs = attributes_;
+    *metrics = metric_map_;
   }
 
   // Filter the 'attributes'.
-  if (!opts.entity_attrs.empty()) {
+  if (!filters.entity_attrs.empty()) {
     bool match_attrs = false;
-    DCHECK(opts.entity_attrs.size() % 2 == 0);
-    for (int i = 0; i < opts.entity_attrs.size(); i += 2) {
+    DCHECK(filters.entity_attrs.size() % 2 == 0);
+    for (int i = 0; i < filters.entity_attrs.size(); i += 2) {
       // The attr_key can't be found or the attr_val can't be matched.
-      AttributeMap::const_iterator it = attrs.find(opts.entity_attrs[i]);
-      if (it == attrs.end() || !MatchNameInList(it->second, { opts.entity_attrs[i+1] })) {
+      AttributeMap::const_iterator it = attrs->find(filters.entity_attrs[i]);
+      if (it == attrs->end() || !MatchNameInList(it->second, { filters.entity_attrs[i+1] })) {
         continue;
       }
       match_attrs = true;
@@ -234,25 +280,38 @@ Status MetricEntity::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& op
     }
     // None of them match.
     if (!match_attrs) {
-      return Status::OK();
+      return Status::NotFound("entity is filtered by some attribute");
     }
   }
 
   // Filter the 'metrics'.
-  if (!opts.entity_metrics.empty()) {
-    for (auto metric = metrics.begin(); metric != metrics.end();) {
-      if (!MatchNameInList(metric->first->name(), opts.entity_metrics)) {
-        metric = metrics.erase(metric);
+  if (!filters.entity_metrics.empty()) {
+    for (auto metric = metrics->begin(); metric != metrics->end();) {
+      if (!MatchNameInList(metric->first->name(), filters.entity_metrics)) {
+        metric = metrics->erase(metric);
       } else {
         ++metric;
       }
     }
     // None of them match.
-    if (metrics.empty()) {
-      return Status::OK();
+    if (metrics->empty()) {
+      return Status::NotFound("entity is filtered by metric types");
     }
   }
 
+  return Status::OK();
+}
+
+Status MetricEntity::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const {
+  MetricMap metrics;
+  AttributeMap attrs;
+  Status s = GetMetricsAndAttrs(opts.filters, &metrics, &attrs);
+  if (s.IsNotFound()) {
+    // Status::NotFound is returned when this entity has been filtered, treat it
+    // as OK, and skip printing it.
+    return Status::OK();
+  }
+
   writer->StartObject();
 
   writer->String("type");
@@ -271,25 +330,51 @@ Status MetricEntity::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& op
     writer->EndObject();
   }
 
-  writer->String("metrics");
-  writer->StartArray();
-  for (MetricMap::value_type& val : metrics) {
-    const auto& m = val.second;
-    if (m->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) {
-      if (!opts.include_untouched_metrics && m->IsUntouched()) {
-        continue;
-      }
-      WARN_NOT_OK(m->WriteAsJson(writer, opts),
-        strings::Substitute("Failed to write $0 as JSON", val.first->name()));
-    }
-  }
-  writer->EndArray();
+  WriteMetricsToJson(writer, metrics, opts);
 
   writer->EndObject();
 
   return Status::OK();
 }
 
+Status MetricEntity::CollectTo(MergedEntityMetrics* collections,
+                               const MetricFilters& filters,
+                               const MetricMergeRules& merge_rules) const {
+  MetricMap metrics;
+  AttributeMap attrs;
+  Status s = GetMetricsAndAttrs(filters, &metrics, &attrs);
+  if (s.IsNotFound()) {
+    // Status::NotFound is returned when this entity has been filtered, treat it
+    // as OK, and skip collecting it.
+    return Status::OK();
+  }
+
+  string entity_type = prototype_->name();
+  string entity_id = id();
+  auto* merge_rule = ::FindOrNull(merge_rules, prototype_->name());
+  if (merge_rule) {
+    entity_type = merge_rule->merge_to;
+    entity_id = attrs[merge_rule->attribute_to_merge_by];
+  }
+
+  MergedEntity e(entity_type, entity_id);
+  auto& table_collection = collections->emplace(std::make_pair(e, MergedMetrics())).first->second;
+  for (const auto& val : metrics) {
+    const MetricPrototype* prototype = val.first;
+    const scoped_refptr<Metric>& metric = val.second;
+
+    scoped_refptr<Metric> entry = FindPtrOrNull(table_collection, prototype);
+    if (!entry) {
+      scoped_refptr<Metric> new_metric = metric->snapshot();
+      InsertOrDie(&table_collection, new_metric->prototype(), new_metric);
+    } else {
+      entry->MergeFrom(metric);
+    }
+  }
+
+  return Status::OK();
+}
+
 void MetricEntity::RetireOldMetrics() {
   MonoTime now(MonoTime::Now());
 
@@ -329,7 +414,6 @@ void MetricEntity::RetireOldMetrics() {
       continue;
     }
 
-
     VLOG(2) << "Retiring metric " << it->first;
     metric_map_.erase(it++);
   }
@@ -368,9 +452,18 @@ Status MetricRegistry::WriteAsJson(JsonWriter* writer, const MetricJsonOptions&
   }
 
   writer->StartArray();
-  for (const auto& e : entities) {
-    WARN_NOT_OK(e.second->WriteAsJson(writer, opts),
-                Substitute("Failed to write entity $0 as JSON", e.second->id()));
+  if (opts.merge_rules.empty()) {
+    for (const auto& e : entities) {
+      WARN_NOT_OK(e.second->WriteAsJson(writer, opts),
+                  Substitute("Failed to write entity $0 as JSON", e.second->id()));
+    }
+  } else {
+    MergedEntityMetrics collections;
+    for (const auto& e : entities) {
+      WARN_NOT_OK(e.second->CollectTo(&collections, opts.filters, opts.merge_rules),
+                  Substitute("Failed to collect entity $0", e.second->id()));
+    }
+    WriteToJson(writer, collections, opts);
   }
   writer->EndArray();
 
@@ -501,18 +594,18 @@ FunctionGaugeDetacher::~FunctionGaugeDetacher() {
 
 scoped_refptr<MetricEntity> MetricRegistry::FindOrCreateEntity(
     const MetricEntityPrototype* prototype,
-    const std::string& id,
-    const MetricEntity::AttributeMap& initial_attributes) {
+    const string& id,
+    const MetricEntity::AttributeMap& initial_attrs) {
   std::lock_guard<simple_spinlock> l(lock_);
   scoped_refptr<MetricEntity> e = FindPtrOrNull(entities_, id);
   if (!e) {
-    e = new MetricEntity(prototype, id, initial_attributes);
+    e = new MetricEntity(prototype, id, initial_attrs);
     InsertOrDie(&entities_, id, e);
   } else if (!e->published()) {
-    e = new MetricEntity(prototype, id, initial_attributes);
+    e = new MetricEntity(prototype, id, initial_attrs);
     entities_[id] = e;
   } else {
-    e->SetAttributes(initial_attributes);
+    e->SetAttributes(initial_attrs);
   }
   return e;
 }
@@ -568,18 +661,61 @@ Status Gauge::WriteAsJson(JsonWriter* writer,
 //
 
 StringGauge::StringGauge(const GaugePrototype<string>* proto,
-                         string initial_value)
-    : Gauge(proto), value_(std::move(initial_value)) {}
+                         string initial_value,
+                         unordered_set<string> initial_unique_values)
+    : Gauge(proto),
+      value_(std::move(initial_value)),
+      unique_values_(std::move(initial_unique_values)) {}
+
+scoped_refptr<Metric> StringGauge::snapshot() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  scoped_refptr<Metric> m
+    = new StringGauge(down_cast<const GaugePrototype<string>*>(prototype_),
+                      value_,
+                      unique_values_);
+  return m;
+}
+
+string StringGauge::value() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (PREDICT_TRUE(unique_values_.empty())) {
+    return value_;
+  }
+  return JoinStrings(unique_values_, ", ");
+}
+
+void StringGauge::FillUniqueValuesUnlocked() {
+  if (unique_values_.empty()) {
+    unique_values_.insert(value_);
+  }
+}
 
-std::string StringGauge::value() const {
+unordered_set<string> StringGauge::unique_values() {
   std::lock_guard<simple_spinlock> l(lock_);
-  return value_;
+  FillUniqueValuesUnlocked();
+  return unique_values_;
 }
 
-void StringGauge::set_value(const std::string& value) {
+void StringGauge::set_value(const string& value) {
   UpdateModificationEpoch();
   std::lock_guard<simple_spinlock> l(lock_);
   value_ = value;
+  unique_values_.clear();
+}
+
+bool StringGauge::MergeFrom(const scoped_refptr<Metric>& other) {
+  if (PREDICT_FALSE(this == other.get())) {
+    return true;
+  }
+  UpdateModificationEpoch();
+
+  scoped_refptr<StringGauge> other_ptr = down_cast<StringGauge*>(other.get());
+  auto other_values = other_ptr->unique_values();
+
+  std::lock_guard<simple_spinlock> l(lock_);
+  FillUniqueValuesUnlocked();
+  unique_values_.insert(other_values.begin(), other_values.end());
+  return true;
 }
 
 void StringGauge::WriteValue(JsonWriter* writer) const {
@@ -656,6 +792,11 @@ Histogram::Histogram(const HistogramPrototype* proto)
     histogram_(new HdrHistogram(proto->max_trackable_value(), proto->num_sig_digits())) {
 }
 
+Histogram::Histogram(const HistogramPrototype* proto, const HdrHistogram& hdr_hist)
+  : Metric(proto),
+    histogram_(new HdrHistogram(hdr_hist)) {
+}
+
 void Histogram::Increment(int64_t value) {
   UpdateModificationEpoch();
   histogram_->Increment(value);
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index 13cc834..a64a735 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -223,6 +223,8 @@
 //
 /////////////////////////////////////////////////////
 
+#include <cstring>
+
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
@@ -230,6 +232,8 @@
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
@@ -329,27 +333,21 @@ namespace kudu {
 
 class Counter;
 class CounterPrototype;
-
-template<typename T>
-class AtomicGauge;
+class Histogram;
+class HistogramPrototype;
+class HistogramSnapshotPB;
+class Metric;
+class MetricEntity;
+class MetricEntityPrototype;
+class MetricRegistry;
 template <typename Sig>
 class Callback;
 template<typename T>
+class AtomicGauge;
+template<typename T>
 class FunctionGauge;
 template<typename T>
 class GaugePrototype;
-
-class Metric;
-class MetricEntityPrototype;
-class MetricPrototype;
-class MetricRegistry;
-
-class Histogram;
-class HistogramPrototype;
-class HistogramSnapshotPB;
-
-class MetricEntity;
-
 } // namespace kudu
 
 // Forward-declare the generic 'server' entity type.
@@ -406,6 +404,36 @@ class MetricType {
   static const char* const kHistogramType;
 };
 
+struct MetricFilters {
+  // A set of substrings to filter entity against, where empty matches all.
+  //
+  // entity type.
+  std::vector<std::string> entity_types;
+  // entity id.
+  std::vector<std::string> entity_ids;
+  // entity attributes.
+  //
+  // Note that the use of attribute filters is a little bit different. The
+  // number of entries should always be even because each pair represents a
+  // key and a value. For example: attributes=k1,v1,k1,v2,k2,v3, that means
+  // the attribute object is matched when one of these filters is satisfied.
+  std::vector<std::string> entity_attrs;
+  // entity metrics.
+  std::vector<std::string> entity_metrics;
+};
+
+struct MergeAttributes {
+  MergeAttributes(std::string to, std::string by)
+    : merge_to(std::move(to)), attribute_to_merge_by(std::move(by)) {
+  }
+  // New merged entity has the prototype name of 'merge_to'.
+  std::string merge_to;
+  // Entities with the same 'attribute_to_merge_by' attribute will be merged.
+  std::string attribute_to_merge_by;
+};
+
+// Entity prototype name -> MergeAttributes.
+typedef std::unordered_map<std::string, MergeAttributes> MetricMergeRules;
 struct MetricJsonOptions {
   MetricJsonOptions() :
     include_raw_histograms(false),
@@ -442,21 +470,15 @@ struct MetricJsonOptions {
   // Whether to include the attributes of each entity.
   bool include_entity_attributes;
 
-  // A set of substrings to filter entity against, where empty matches all.
-  //
-  // entity type.
-  std::vector<std::string> entity_types;
-  // entity id.
-  std::vector<std::string> entity_ids;
-  // entity attributes.
-  //
-  // Note that the use of attribute filters is a little bit different. The
-  // number of entries should always be even because each pair represents a
-  // key and a value. For example: attributes=k1,v1,k1,v2,k2,v3, that means
-  // the attribute object is matched when one of these filters is satisfied.
-  std::vector<std::string> entity_attrs;
-  // entity metrics.
-  std::vector<std::string> entity_metrics;
+  // Metrics will be filtered by 'filters', see MetricFilters for more details.
+  MetricFilters filters;
+
+  // Entities whose prototype name is in merge_rules's key set will be merged
+  // to a new entity. See struct MergeAttributes for more merge details.
+  // NOTE: Entities which have been merged will not be output.
+  // NOTE: Entities whose prototype name is NOT in merge_rules's key set will
+  // not be merged.
+  MetricMergeRules merge_rules;
 };
 
 class MetricEntityPrototype {
@@ -486,6 +508,99 @@ class MetricEntityPrototype {
   DISALLOW_COPY_AND_ASSIGN(MetricEntityPrototype);
 };
 
+class MetricPrototype {
+ public:
+  // Simple struct to aggregate the arguments common to all prototypes.
+  // This makes constructor chaining a little less tedious.
+  struct CtorArgs {
+    CtorArgs(const char* entity_type,
+             const char* name,
+             const char* label,
+             MetricUnit::Type unit,
+             const char* description,
+             uint32_t flags = 0)
+      : entity_type_(entity_type),
+        name_(name),
+        label_(label),
+        unit_(unit),
+        description_(description),
+        flags_(flags) {
+    }
+
+    const char* const entity_type_;
+    const char* const name_;
+    const char* const label_;
+    const MetricUnit::Type unit_;
+    const char* const description_;
+    const uint32_t flags_;
+  };
+
+  const char* entity_type() const { return args_.entity_type_; }
+  const char* name() const { return args_.name_; }
+  const char* label() const { return args_.label_; }
+  MetricUnit::Type unit() const { return args_.unit_; }
+  const char* description() const { return args_.description_; }
+  virtual MetricType::Type type() const = 0;
+
+  // Writes the fields of this prototype to the given JSON writer.
+  void WriteFields(JsonWriter* writer,
+                   const MetricJsonOptions& opts) const;
+
+ protected:
+  explicit MetricPrototype(CtorArgs args);
+  virtual ~MetricPrototype() {
+  }
+
+  const CtorArgs args_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MetricPrototype);
+};
+
+struct MetricPrototypeHash {
+  size_t operator()(const MetricPrototype* metric_prototype) const {
+    return std::hash<const char*>()(metric_prototype->name());
+  }
+};
+
+struct MetricPrototypeEqualTo {
+  bool operator()(const MetricPrototype* first, const MetricPrototype* second) const {
+    return strcmp(first->name(), second->name()) == 0;
+  }
+};
+
+// A struct to indicate a merged metric entity.
+struct MergedEntity {
+  MergedEntity(std::string type, std::string id)
+    : type_(std::move(type)), id_(std::move(id)) {}
+  // An upper layer concept than type of MetricEntity.
+  std::string type_;
+  // ID to distinguish the same type of objects.
+  std::string id_;
+};
+
+struct MergedEntityHash {
+  size_t operator()(const MergedEntity& entity) const {
+    return std::hash<std::string>()(entity.type_ + entity.id_);
+  }
+};
+
+struct MergedEntityEqual {
+  bool operator()(const MergedEntity& first, const MergedEntity& second) const {
+    return first.type_ == second.type_ && first.id_ == second.id_;
+  }
+};
+
+typedef std::unordered_map<const MetricPrototype*,
+                           scoped_refptr<Metric>,
+                           MetricPrototypeHash,
+                           MetricPrototypeEqualTo> MergedMetrics;
+
+typedef std::unordered_map<MergedEntity,
+                           MergedMetrics,
+                           MergedEntityHash,
+                           MergedEntityEqual> MergedEntityMetrics;
+
 class MetricEntity : public RefCountedThreadSafe<MetricEntity> {
  public:
   typedef std::unordered_map<const MetricPrototype*, scoped_refptr<Metric> > MetricMap;
@@ -511,6 +626,12 @@ class MetricEntity : public RefCountedThreadSafe<MetricEntity> {
   // See MetricRegistry::WriteAsJson()
   Status WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const;
 
+  // Collect metrics of this entity to 'collections'. Metrics will be filtered by 'filters',
+  // and will be merged under the rule of 'merge_rules'.
+  Status CollectTo(MergedEntityMetrics* collections,
+                   const MetricFilters& filters,
+                   const MetricMergeRules& merge_rules) const;
+
   const MetricMap& UnsafeMetricsMapForTests() const { return metric_map_; }
 
   // Mark that the given metric should never be retired until the metric
@@ -562,6 +683,13 @@ class MetricEntity : public RefCountedThreadSafe<MetricEntity> {
   // type defined within the metric prototype.
   void CheckInstantiation(const MetricPrototype* proto) const;
 
+  // Get a snapshot of the entity's metrics as well as the entity's attributes,
+  // maybe filtered by 'filters', see MetricFilters structure for details.
+  // Return Status::NotFound when it has been filtered, or Status::OK when succeed.
+  Status GetMetricsAndAttrs(const MetricFilters& filters,
+                            MetricMap* metrics,
+                            AttributeMap* attrs) const;
+
   const MetricEntityPrototype* const prototype_;
   const std::string id_;
 
@@ -584,6 +712,8 @@ class MetricEntity : public RefCountedThreadSafe<MetricEntity> {
 // See documentation at the top of this file for information on metrics ownership.
 class Metric : public RefCountedThreadSafe<Metric> {
  public:
+  // Take a snapshot to a new metric with the same attributes and metric value.
+  virtual scoped_refptr<Metric> snapshot() const = 0;
   // All metrics must be able to render themselves as JSON.
   virtual Status WriteAsJson(JsonWriter* writer,
                              const MetricJsonOptions& opts) const = 0;
@@ -611,6 +741,11 @@ class Metric : public RefCountedThreadSafe<Metric> {
   // metrics).
   static void IncrementEpoch();
 
+  // Merges 'other' into this Metric object.
+  // Return false if any error occurs, otherwise return true.
+  // NOTE: If merge with self, do nothing and return true directly.
+  virtual bool MergeFrom(const scoped_refptr<Metric>& other) = 0;
+
  protected:
   explicit Metric(const MetricPrototype* prototype);
   virtual ~Metric();
@@ -734,55 +869,6 @@ enum PrototypeFlags {
   EXPOSE_AS_COUNTER = 1 << 0
 };
 
-class MetricPrototype {
- public:
-  // Simple struct to aggregate the arguments common to all prototypes.
-  // This makes constructor chaining a little less tedious.
-  struct CtorArgs {
-    CtorArgs(const char* entity_type,
-             const char* name,
-             const char* label,
-             MetricUnit::Type unit,
-             const char* description,
-             uint32_t flags = 0)
-      : entity_type_(entity_type),
-        name_(name),
-        label_(label),
-        unit_(unit),
-        description_(description),
-        flags_(flags) {
-    }
-
-    const char* const entity_type_;
-    const char* const name_;
-    const char* const label_;
-    const MetricUnit::Type unit_;
-    const char* const description_;
-    const uint32_t flags_;
-  };
-
-  const char* entity_type() const { return args_.entity_type_; }
-  const char* name() const { return args_.name_; }
-  const char* label() const { return args_.label_; }
-  MetricUnit::Type unit() const { return args_.unit_; }
-  const char* description() const { return args_.description_; }
-  virtual MetricType::Type type() const = 0;
-
-  // Writes the fields of this prototype to the given JSON writer.
-  void WriteFields(JsonWriter* writer,
-                   const MetricJsonOptions& opts) const;
-
- protected:
-  explicit MetricPrototype(CtorArgs args);
-  virtual ~MetricPrototype() {
-  }
-
-  const CtorArgs args_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(MetricPrototype);
-};
-
 // A description of a Gauge.
 template<typename T>
 class GaugePrototype : public MetricPrototype {
@@ -837,18 +923,26 @@ class Gauge : public Metric {
 class StringGauge : public Gauge {
  public:
   StringGauge(const GaugePrototype<std::string>* proto,
-              std::string initial_value);
+              std::string initial_value,
+              std::unordered_set<std::string> initial_unique_values
+                  = std::unordered_set<std::string>());
+  scoped_refptr<Metric> snapshot() const OVERRIDE;
   std::string value() const;
   void set_value(const std::string& value);
   virtual bool IsUntouched() const override {
     return false;
   }
+  bool MergeFrom(const scoped_refptr<Metric>& other) OVERRIDE;
 
  protected:
+  FRIEND_TEST(MetricsTest, SimpleStringGaugeForMergeTest);
   virtual void WriteValue(JsonWriter* writer) const OVERRIDE;
+  void FillUniqueValuesUnlocked();
+  std::unordered_set<std::string> unique_values();
  private:
   std::string value_;
-  mutable simple_spinlock lock_;  // Guards value_
+  std::unordered_set<std::string> unique_values_;
+  mutable simple_spinlock lock_;  // Guards value_ and unique_values_
   DISALLOW_COPY_AND_ASSIGN(StringGauge);
 };
 
@@ -860,6 +954,11 @@ class AtomicGauge : public Gauge {
     : Gauge(proto),
       value_(initial_value) {
   }
+  scoped_refptr<Metric> snapshot() const override {
+    scoped_refptr<Metric> m = new AtomicGauge(down_cast<const GaugePrototype<T>*>(prototype_),
+                                              value());
+    return m;
+  }
   T value() const {
     return static_cast<T>(value_.Load(kMemOrderRelease));
   }
@@ -883,6 +982,12 @@ class AtomicGauge : public Gauge {
   virtual bool IsUntouched() const override {
     return false;
   }
+  bool MergeFrom(const scoped_refptr<Metric>& other) override {
+    if (PREDICT_TRUE(this != other.get())) {
+      IncrementBy(down_cast<AtomicGauge<T>*>(other.get())->value());
+    }
+    return true;
+  }
  protected:
   virtual void WriteValue(JsonWriter* writer) const OVERRIDE {
     writer->Value(value());
@@ -953,6 +1058,16 @@ class FunctionGaugeDetacher {
 template <typename T>
 class FunctionGauge : public Gauge {
  public:
+  scoped_refptr<Metric> snapshot() const override {
+    scoped_refptr<Metric> m = new FunctionGauge(down_cast<const GaugePrototype<T>*>(prototype_),
+                                                Callback<T()>(function_));
+    // The bounded function is associated with another MetricEntity instance, here we don't know
+    // when it release, it's not safe to keep the function as a member, so it's needed to
+    // call DetachToCurrentValue() to make it safe.
+    down_cast<FunctionGauge<T>*>(m.get())->DetachToCurrentValue();
+    return m;
+  }
+
   T value() const {
     std::lock_guard<simple_spinlock> l(lock_);
     return function_.Run();
@@ -1002,6 +1117,14 @@ class FunctionGauge : public Gauge {
     return false;
   }
 
+  // value() will be constant after MergeFrom()
+  bool MergeFrom(const scoped_refptr<Metric>& other) override {
+    if (PREDICT_TRUE(this != other.get())) {
+      DetachToConstant(value() + down_cast<FunctionGauge<T>*>(other.get())->value());
+    }
+    return true;
+  }
+
  private:
   friend class MetricEntity;
 
@@ -1043,6 +1166,11 @@ class CounterPrototype : public MetricPrototype {
 // across multiple servers, etc, which aren't appropriate in the case of gauges.
 class Counter : public Metric {
  public:
+  scoped_refptr<Metric> snapshot() const override {
+    scoped_refptr<Metric> m = new Counter(down_cast<const CounterPrototype*>(prototype_));
+    down_cast<Counter*>(m.get())->IncrementBy(value());
+    return m;
+  }
   int64_t value() const;
   void Increment();
   void IncrementBy(int64_t amount);
@@ -1053,8 +1181,16 @@ class Counter : public Metric {
     return value() == 0;
   }
 
+  bool MergeFrom(const scoped_refptr<Metric>& other) override {
+    if (PREDICT_TRUE(this != other.get())) {
+      IncrementBy(down_cast<Counter*>(other.get())->value());
+    }
+    return true;
+  }
+
  private:
   FRIEND_TEST(MetricsTest, SimpleCounterTest);
+  FRIEND_TEST(MetricsTest, SimpleCounterMergeTest);
   FRIEND_TEST(MultiThreadedMetricsTest, CounterIncrementTest);
   friend class MetricEntity;
 
@@ -1082,6 +1218,12 @@ class HistogramPrototype : public MetricPrototype {
 
 class Histogram : public Metric {
  public:
+  scoped_refptr<Metric> snapshot() const override {
+    scoped_refptr<Metric> m = new Histogram(down_cast<const HistogramPrototype*>(prototype_),
+                                              *histogram_);
+    return m;
+  }
+
   // Increment the histogram for the given value.
   // 'value' must be non-negative.
   void Increment(int64_t value);
@@ -1114,10 +1256,17 @@ class Histogram : public Metric {
     return TotalCount() == 0;
   }
 
+  bool MergeFrom(const scoped_refptr<Metric>& other) override {
+    if (PREDICT_TRUE(this != other.get())) {
+      histogram_->MergeFrom(*(down_cast<Histogram*>(other.get())->histogram()));
+    }
+    return true;
+  }
+
  private:
-  FRIEND_TEST(MetricsTest, SimpleHistogramTest);
   friend class MetricEntity;
   explicit Histogram(const HistogramPrototype* proto);
+  Histogram(const HistogramPrototype* proto, const HdrHistogram& hdr_hist);
 
   const gscoped_ptr<HdrHistogram> histogram_;
   DISALLOW_COPY_AND_ASSIGN(Histogram);