You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/19 03:59:51 UTC

[incubator-celeborn] branch main updated: [CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e09b629d [CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)
e09b629d is described below

commit e09b629da21e31e755a32fdaee646e3e4dc51948
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Thu Jan 19 11:59:48 2023 +0800

    [CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)
---
 .../common/metrics/source/AbstractSource.scala     | 55 ++++++++++++++++------
 .../metrics/source/CelebornSourceSuite.scala       | 53 +++++++++++++++++++++
 2 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index fea2fcdb..6bdf85b8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -30,13 +30,24 @@ import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.{ResettableSlidingWindowReservoir, RssHistogram, RssTimer}
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 
-case class NamedCounter(name: String, counter: Counter)
+private[source] trait MetricLabels {
+  val labels: Map[String, String]
+  def labelString: String = {
+    "{" + labels.map { case (key: String, value: String) => s"""$key="$value"""" }.mkString(
+      " ") + "}"
+  }
+}
+
+case class NamedCounter(name: String, counter: Counter, labels: Map[String, String])
+  extends MetricLabels
 
-case class NamedGauge[T](name: String, gauge: Gauge[T])
+case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
+  extends MetricLabels
 
-case class NamedHistogram(name: String, histogram: Histogram)
+case class NamedHistogram(name: String, histogram: Histogram, labels: Map[String, String])
+  extends MetricLabels
 
-case class NamedTimer(name: String, timer: Timer)
+case class NamedTimer(name: String, timer: Timer, labels: Map[String, String]) extends MetricLabels
 
 abstract class AbstractSource(conf: CelebornConf, role: String)
   extends Source with Logging {
@@ -57,32 +68,46 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
   val metricsCleaner: ScheduledExecutorService =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"worker-metrics-cleaner")
 
+  val roleLabel = "role" -> role
+
   protected val namedGauges: java.util.List[NamedGauge[_]] =
     new java.util.ArrayList[NamedGauge[_]]()
 
-  def addGauge[T](name: String, f: Unit => T): Unit = {
+  def addGauge[T](name: String, f: Unit => T): Unit = addGauge(name, f, Map.empty[String, String])
+
+  def addGauge[T](
+      name: String,
+      f: Unit => T,
+      labels: Map[String, String]): Unit = {
     val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new GaugeSupplier[T](f)
     val gauge = metricRegistry.gauge(name, supplier)
-    namedGauges.add(NamedGauge(name, gauge))
+    namedGauges.add(NamedGauge(name, gauge, labels + roleLabel))
   }
 
-  def addGauge[T](name: String, guage: Gauge[T]): Unit = {
-    namedGauges.add(NamedGauge(name, guage))
+  def addGauge[T](name: String, gauge: Gauge[T]): Unit =
+    addGauge(name, gauge, Map.empty[String, String])
+
+  def addGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String]): Unit = {
+    namedGauges.add(NamedGauge(name, gauge, labels + roleLabel))
   }
 
   protected val namedTimers =
     new ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()
 
-  def addTimer(name: String): Unit = {
-    val namedTimer = NamedTimer(name, metricRegistry.timer(name, timerSupplier))
+  def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
+
+  def addTimer(name: String, labels: Map[String, String]): Unit = {
+    val namedTimer = NamedTimer(name, metricRegistry.timer(name, timerSupplier), labels + roleLabel)
     namedTimers.putIfAbsent(name, (namedTimer, new ConcurrentHashMap[String, Long]()))
   }
 
   protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
     new ConcurrentHashMap[String, NamedCounter]()
 
-  def addCounter(name: String): Unit = {
-    namedCounters.put(name, NamedCounter(name, metricRegistry.counter(name)))
+  def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])
+
+  def addCounter(name: String, labels: Map[String, String] = Map.empty[String, String]): Unit = {
+    namedCounters.put(name, NamedCounter(name, metricRegistry.counter(name), labels + roleLabel))
   }
 
   protected def counters(): List[NamedCounter] = {
@@ -208,12 +233,14 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
 
   def recordCounter(nc: NamedCounter): Unit = {
     val timestamp = System.currentTimeMillis
+    val label = nc.labelString
     updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n")
   }
 
   def recordGauge(ng: NamedGauge[_]): Unit = {
     val timestamp = System.currentTimeMillis
     val sb = new StringBuilder
+    val label = ng.labelString
     sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n")
 
     updateInnerMetrics(sb.toString())
@@ -224,6 +251,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
     val sb = new mutable.StringBuilder
     val snapshot = nh.histogram.getSnapshot
     val prefix = normalizeKey(nh.name)
+    val label = nh.labelString
     sb.append(s"${prefix}Count$label ${nh.histogram.getCount} $timestamp\n")
     sb.append(s"${prefix}Max$label ${reportNanosAsMills(snapshot.getMax)} $timestamp\n")
     sb.append(s"${prefix}Mean$label ${reportNanosAsMills(snapshot.getMean)} $timestamp\n")
@@ -249,6 +277,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
     val sb = new mutable.StringBuilder
     val snapshot = nt.timer.getSnapshot
     val prefix = normalizeKey(nt.name)
+    val label = nt.labelString
     sb.append(s"${prefix}Count$label ${nt.timer.getCount} $timestamp\n")
     sb.append(s"${prefix}Max$label ${reportNanosAsMills(snapshot.getMax)} $timestamp\n")
     sb.append(s"${prefix}Mean$label ${reportNanosAsMills(snapshot.getMean)} $timestamp\n")
@@ -299,8 +328,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
   protected def reportNanosAsMills(value: Double): Double = {
     BigDecimal(value / 1000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
   }
-
-  val label = s"""{role="$role"}"""
 }
 
 class TimerSupplier(val slidingWindowSize: Int)
diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
new file mode 100644
index 00000000..1cbec78d
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.metrics.source
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+
+class CelebornSourceSuite extends CelebornFunSuite {
+
+  test("test getMetrics with customized label") {
+    val mockSource = new AbstractSource(new CelebornConf(), "mock") {
+      override def sourceName: String = "mockSource"
+    }
+    mockSource.addGauge("Gauge1", _ => 1000)
+    mockSource.addGauge("Gauge2", _ => 2000, Map("user" -> "user1"))
+    mockSource.addCounter("Counter1")
+    mockSource.incCounter("Counter1", 3000)
+    mockSource.addCounter("Counter2", Map("user" -> "user2"))
+    mockSource.incCounter("Counter2", 4000)
+    mockSource.addTimer("Timer1")
+    mockSource.addTimer("Timer2", Map("user" -> "user3"))
+
+    val res = mockSource.getMetrics()
+    val exp1 = "metrics_Gauge1_Value{role=\"mock\"} 1000"
+    val exp2 = "metrics_Gauge2_Value{user=\"user1\" role=\"mock\"} 2000"
+    val exp3 = "metrics_Counter1_Count{role=\"mock\"} 3000"
+    val exp4 = "metrics_Counter2_Count{user=\"user2\" role=\"mock\"} 4000"
+    val exp5 = "metrics_Timer1_Count{role=\"mock\"} 0"
+    val exp6 = "metrics_Timer2_Count{user=\"user3\" role=\"mock\"} 0"
+
+    assert(res.contains(exp1))
+    assert(res.contains(exp2))
+    assert(res.contains(exp3))
+    assert(res.contains(exp4))
+    assert(res.contains(exp5))
+    assert(res.contains(exp6))
+  }
+}