You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/12/18 02:29:56 UTC

[kudu] branch master updated: [metrics] Support SUM, MAX and MIN merge types for metrics

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 51ae0ea  [metrics] Support SUM, MAX and MIN merge types for metrics
51ae0ea is described below

commit 51ae0ea4cfcf04365876289b1067f7c51921a96d
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Thu Dec 12 18:15:52 2019 +0800

    [metrics] Support SUM, MAX and MIN merge types for metrics
    
    Now metrics only support SUM semantic when merge, some type of
    metrics like timestamp, may need MAX or MIN semantic when merge.
    
    Change-Id: I1acad40c2b73cfeb16e0dc4ad5ffba4f19167c81
    Reviewed-on: http://gerrit.cloudera.org:8080/14903
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/util/metrics-test.cc | 39 ++++++++++++++++--
 src/kudu/util/metrics.h       | 96 ++++++++++++++++++++++++++++++++-----------
 2 files changed, 109 insertions(+), 26 deletions(-)

diff --git a/src/kudu/util/metrics-test.cc b/src/kudu/util/metrics-test.cc
index 1a5f1ed..081f294 100644
--- a/src/kudu/util/metrics-test.cc
+++ b/src/kudu/util/metrics-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/util/metrics.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -221,11 +222,11 @@ TEST_F(MetricsTest, SimpleAtomicGaugeTest) {
   ASSERT_EQ(5, mem_usage->value());
 }
 
-TEST_F(MetricsTest, SimpleAtomicGaugeMergeTest) {
+TEST_F(MetricsTest, SimpleAtomicGaugeSumTypeMergeTest) {
   scoped_refptr<AtomicGauge<uint64_t> > mem_usage =
-    METRIC_test_gauge.Instantiate(entity_, 2);
+      METRIC_test_gauge.Instantiate(entity_, 2);
   scoped_refptr<AtomicGauge<uint64_t> > mem_usage_for_merge =
-    METRIC_test_gauge.Instantiate(entity_same_attr_, 3);
+      METRIC_test_gauge.Instantiate(entity_same_attr_, 3);
   mem_usage_for_merge->MergeFrom(mem_usage);
   ASSERT_EQ(2, mem_usage->value());
   ASSERT_EQ(5, mem_usage_for_merge->value());
@@ -237,6 +238,38 @@ TEST_F(MetricsTest, SimpleAtomicGaugeMergeTest) {
   ASSERT_EQ(14, mem_usage_for_merge->value());
 }
 
+TEST_F(MetricsTest, SimpleAtomicGaugeMaxTypeMergeTest) {
+  scoped_refptr<AtomicGauge<uint64_t> > stop_time =
+      METRIC_test_gauge.Instantiate(entity_, 2, MergeType::kMax);
+  scoped_refptr<AtomicGauge<uint64_t> > stop_time_for_merge =
+      METRIC_test_gauge.Instantiate(entity_same_attr_, 3, MergeType::kMax);
+  stop_time_for_merge->MergeFrom(stop_time);
+  ASSERT_EQ(2, stop_time->value());
+  ASSERT_EQ(3, stop_time_for_merge->value());
+  stop_time->IncrementBy(7);
+  stop_time_for_merge->MergeFrom(stop_time);
+  ASSERT_EQ(9, stop_time->value());
+  ASSERT_EQ(9, stop_time_for_merge->value());
+  stop_time_for_merge->MergeFrom(stop_time_for_merge);
+  ASSERT_EQ(9, stop_time_for_merge->value());
+}
+
+TEST_F(MetricsTest, SimpleAtomicGaugeMinTypeMergeTest) {
+  scoped_refptr<AtomicGauge<uint64_t> > start_time =
+      METRIC_test_gauge.Instantiate(entity_, 3, MergeType::kMin);
+  scoped_refptr<AtomicGauge<uint64_t> > start_time_for_merge =
+      METRIC_test_gauge.Instantiate(entity_same_attr_, 5, MergeType::kMin);
+  start_time_for_merge->MergeFrom(start_time);
+  ASSERT_EQ(3, start_time->value());
+  ASSERT_EQ(3, start_time_for_merge->value());
+  start_time->DecrementBy(2);
+  start_time_for_merge->MergeFrom(start_time);
+  ASSERT_EQ(1, start_time->value());
+  ASSERT_EQ(1, start_time_for_merge->value());
+  start_time_for_merge->MergeFrom(start_time_for_merge);
+  ASSERT_EQ(1, start_time_for_merge->value());
+}
+
 METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Function Gauge",
                           MetricUnit::kBytes, "Test Gauge 2",
                           kudu::MetricLevel::kInfo);
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index 21683f2..20f8455 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -226,6 +226,7 @@
 //
 /////////////////////////////////////////////////////
 
+#include <algorithm>
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
@@ -430,6 +431,17 @@ enum class MetricLevel {
   kWarn = 2
 };
 
+// Type of behavior when two metrics merge together, it only take effect on the result
+// of MergeFrom.
+enum class MergeType {
+  // Set the result as the sum of the two metrics.
+  kSum = 0,
+  // Set the result as the maximum one of the two metrics.
+  kMax = 1,
+  // Set the result as the minimum one of the two metrics.
+  kMin = 2
+};
+
 struct MetricFilters {
   // A set of substrings to filter entity against, where empty matches all.
   //
@@ -643,13 +655,15 @@ class MetricEntity : public RefCountedThreadSafe<MetricEntity> {
 
   template<typename T>
   scoped_refptr<AtomicGauge<T> > FindOrCreateGauge(const GaugePrototype<T>* proto,
-                                                   const T& initial_value);
+                                                   const T& initial_value,
+                                                   MergeType type = MergeType::kSum);
 
   scoped_refptr<MeanGauge> FindOrCreateMeanGauge(const GaugePrototype<double>* proto);
 
   template<typename T>
   scoped_refptr<FunctionGauge<T> > FindOrCreateFunctionGauge(const GaugePrototype<T>* proto,
-                                                             const Callback<T()>& function);
+                                                             const Callback<T()>& function,
+                                                             MergeType type = MergeType::kSum);
 
   // Return the metric instantiated from the given prototype, or NULL if none has been
   // instantiated. Primarily used by tests trying to read metric values.
@@ -949,8 +963,8 @@ class GaugePrototype : public MetricPrototype {
   // Instantiate a "manual" gauge.
   scoped_refptr<AtomicGauge<T> > Instantiate(
       const scoped_refptr<MetricEntity>& entity,
-      const T& initial_value) const {
-    return entity->FindOrCreateGauge(this, initial_value);
+      const T& initial_value, MergeType type = MergeType::kSum) const {
+    return entity->FindOrCreateGauge(this, initial_value, type);
   }
 
   scoped_refptr<MeanGauge> InstantiateMeanGauge(
@@ -961,16 +975,18 @@ class GaugePrototype : public MetricPrototype {
   // Instantiate a gauge that is backed by the given callback.
   scoped_refptr<FunctionGauge<T> > InstantiateFunctionGauge(
       const scoped_refptr<MetricEntity>& entity,
-      const Callback<T()>& function) const {
-    return entity->FindOrCreateFunctionGauge(this, function);
+      const Callback<T()>& function,
+      MergeType type = MergeType::kSum) const {
+    return entity->FindOrCreateFunctionGauge(this, function, type);
   }
 
   // Instantiate a "manual" gauge and hide it. It will appear
   // when its value is updated, or when its entity is merged.
   scoped_refptr<AtomicGauge<T> > InstantiateHidden(
       const scoped_refptr<MetricEntity>& entity,
-      const T& initial_value) const {
-    auto gauge = Instantiate(entity, initial_value);
+      const T& initial_value,
+      MergeType type = MergeType::kSum) const {
+    auto gauge = Instantiate(entity, initial_value, type);
     gauge->InvalidateEpoch();
     return gauge;
   }
@@ -979,8 +995,9 @@ class GaugePrototype : public MetricPrototype {
   // invalidate the result when merge with other metric.
   scoped_refptr<AtomicGauge<T> > InstantiateInvalid(
       const scoped_refptr<MetricEntity>& entity,
-      const T& initial_value) const {
-    auto gauge = InstantiateHidden(entity, initial_value);
+      const T& initial_value,
+      MergeType type = MergeType::kSum) const {
+    auto gauge = InstantiateHidden(entity, initial_value, type);
     gauge->InvalidateForMerge();
     return gauge;
   }
@@ -1071,12 +1088,13 @@ class MeanGauge : public Gauge {
 template <typename T>
 class AtomicGauge : public Gauge {
  public:
-  AtomicGauge(const GaugePrototype<T>* proto, T initial_value)
+  AtomicGauge(const GaugePrototype<T>* proto, T initial_value, MergeType type)
     : Gauge(proto),
-      value_(initial_value) {
+      value_(initial_value),
+      type_(type) {
   }
   scoped_refptr<Metric> snapshot() const override {
-    auto p = new AtomicGauge(down_cast<const GaugePrototype<T>*>(prototype_), value());
+    auto p = new AtomicGauge(down_cast<const GaugePrototype<T>*>(prototype_), value(), type_);
     p->m_epoch_.store(m_epoch_);
     p->invalid_for_merge_ = invalid_for_merge_;
     p->retire_time_ = retire_time_;
@@ -1114,14 +1132,29 @@ class AtomicGauge : public Gauge {
       return;
     }
 
-    IncrementBy(down_cast<AtomicGauge<T>*>(other.get())->value());
+    auto other_value = down_cast<AtomicGauge<T>*>(other.get())->value();
+    switch (type_) {
+      case MergeType::kSum:
+        IncrementBy(other_value);
+        break;
+      case MergeType::kMax:
+        set_value(std::max(value(), other_value));
+        break;
+      case MergeType::kMin:
+        set_value(std::min(value(), other_value));
+        break;
+      default:
+        LOG(FATAL) << "Unknown AtomicGauge type: " << prototype()->name();
+    }
   }
  protected:
   virtual void WriteValue(JsonWriter* writer) const OVERRIDE {
     writer->Value(value());
   }
-  AtomicInt<int64_t> value_;
  private:
+  AtomicInt<int64_t> value_;
+  MergeType type_;
+
   DISALLOW_COPY_AND_ASSIGN(AtomicGauge);
 };
 
@@ -1190,7 +1223,7 @@ class FunctionGauge : public Gauge {
  public:
   scoped_refptr<Metric> snapshot() const override {
     auto p = new FunctionGauge(down_cast<const GaugePrototype<T>*>(prototype_),
-                               Callback<T()>(function_));
+                               Callback<T()>(function_), type_);
     // The bounded function is associated with another MetricEntity instance, here we don't know
     // when it release, it's not safe to keep the function as a member, so it's needed to
     // call DetachToCurrentValue() to make it safe.
@@ -1258,14 +1291,27 @@ class FunctionGauge : public Gauge {
 
     // It's not needed to check whether a FunctionGauge is InvalidateIfNeededInMerge
     // or not, because it's always 'touched' after constructing.
-    DetachToConstant(value() + down_cast<FunctionGauge<T>*>(other.get())->value());
+    auto other_value = down_cast<FunctionGauge<T>*>(other.get())->value();
+    switch (type_) {
+      case MergeType::kSum:
+        DetachToConstant(value() + other_value);
+        break;
+      case MergeType::kMax:
+        DetachToConstant(std::max(value(), other_value));
+        break;
+      case MergeType::kMin:
+        DetachToConstant(std::min(value(), other_value));
+        break;
+      default:
+        LOG(FATAL) << "Unknown FunctionGauge type: " << prototype()->name();
+    }
   }
 
  private:
   friend class MetricEntity;
 
-  FunctionGauge(const GaugePrototype<T>* proto, Callback<T()> function)
-      : Gauge(proto), function_(std::move(function)) {
+  FunctionGauge(const GaugePrototype<T>* proto, Callback<T()> function, MergeType type)
+      : Gauge(proto), function_(std::move(function)), type_(type) {
     // Override the modification epoch to the maximum, since we don't have any idea
     // when the bound function changes value.
     m_epoch_ = std::numeric_limits<decltype(m_epoch_.load())>::max();
@@ -1277,6 +1323,8 @@ class FunctionGauge : public Gauge {
 
   mutable simple_spinlock lock_;
   Callback<T()> function_;
+  MergeType type_;
+
   DISALLOW_COPY_AND_ASSIGN(FunctionGauge);
 };
 
@@ -1472,13 +1520,14 @@ inline scoped_refptr<Histogram> MetricEntity::FindOrCreateHistogram(
 template<typename T>
 inline scoped_refptr<AtomicGauge<T> > MetricEntity::FindOrCreateGauge(
     const GaugePrototype<T>* proto,
-    const T& initial_value) {
+    const T& initial_value,
+    MergeType type) {
   CheckInstantiation(proto);
   std::lock_guard<simple_spinlock> l(lock_);
   scoped_refptr<AtomicGauge<T> > m = down_cast<AtomicGauge<T>*>(
       FindPtrOrNull(metric_map_, proto).get());
   if (!m) {
-    m = new AtomicGauge<T>(proto, initial_value);
+    m = new AtomicGauge<T>(proto, initial_value, type);
     InsertOrDie(&metric_map_, proto, m);
   }
   return m;
@@ -1500,13 +1549,14 @@ inline scoped_refptr<MeanGauge> MetricEntity::FindOrCreateMeanGauge(
 template<typename T>
 inline scoped_refptr<FunctionGauge<T> > MetricEntity::FindOrCreateFunctionGauge(
     const GaugePrototype<T>* proto,
-    const Callback<T()>& function) {
+    const Callback<T()>& function,
+    MergeType type) {
   CheckInstantiation(proto);
   std::lock_guard<simple_spinlock> l(lock_);
   scoped_refptr<FunctionGauge<T> > m = down_cast<FunctionGauge<T>*>(
       FindPtrOrNull(metric_map_, proto).get());
   if (!m) {
-    m = new FunctionGauge<T>(proto, function);
+    m = new FunctionGauge<T>(proto, function, type);
     InsertOrDie(&metric_map_, proto, m);
   }
   return m;