You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/19 03:59:51 UTC
[incubator-celeborn] branch main updated: [CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e09b629d [CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)
e09b629d is described below
commit e09b629da21e31e755a32fdaee646e3e4dc51948
Author: nafiy <30...@users.noreply.github.com>
AuthorDate: Thu Jan 19 11:59:48 2023 +0800
[CELEBORN-229][FEATURE] Support collect metrics with customized labels (#1173)
---
.../common/metrics/source/AbstractSource.scala | 55 ++++++++++++++++------
.../metrics/source/CelebornSourceSuite.scala | 53 +++++++++++++++++++++
2 files changed, 94 insertions(+), 14 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index fea2fcdb..6bdf85b8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -30,13 +30,24 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.{ResettableSlidingWindowReservoir, RssHistogram, RssTimer}
import org.apache.celeborn.common.util.{ThreadUtils, Utils}
-case class NamedCounter(name: String, counter: Counter)
+private[source] trait MetricLabels {
+ val labels: Map[String, String]
+ def labelString: String = {
+ "{" + labels.map { case (key: String, value: String) => s"""$key="$value"""" }.mkString(
+ " ") + "}"
+ }
+}
+
+case class NamedCounter(name: String, counter: Counter, labels: Map[String, String])
+ extends MetricLabels
-case class NamedGauge[T](name: String, gauge: Gauge[T])
+case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
+ extends MetricLabels
-case class NamedHistogram(name: String, histogram: Histogram)
+case class NamedHistogram(name: String, histogram: Histogram, labels: Map[String, String])
+ extends MetricLabels
-case class NamedTimer(name: String, timer: Timer)
+case class NamedTimer(name: String, timer: Timer, labels: Map[String, String]) extends MetricLabels
abstract class AbstractSource(conf: CelebornConf, role: String)
extends Source with Logging {
@@ -57,32 +68,46 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val metricsCleaner: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"worker-metrics-cleaner")
+ val roleLabel = "role" -> role
+
protected val namedGauges: java.util.List[NamedGauge[_]] =
new java.util.ArrayList[NamedGauge[_]]()
- def addGauge[T](name: String, f: Unit => T): Unit = {
+ def addGauge[T](name: String, f: Unit => T): Unit = addGauge(name, f, Map.empty[String, String])
+
+ def addGauge[T](
+ name: String,
+ f: Unit => T,
+ labels: Map[String, String]): Unit = {
val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new GaugeSupplier[T](f)
val gauge = metricRegistry.gauge(name, supplier)
- namedGauges.add(NamedGauge(name, gauge))
+ namedGauges.add(NamedGauge(name, gauge, labels + roleLabel))
}
- def addGauge[T](name: String, guage: Gauge[T]): Unit = {
- namedGauges.add(NamedGauge(name, guage))
+ def addGauge[T](name: String, gauge: Gauge[T]): Unit =
+ addGauge(name, gauge, Map.empty[String, String])
+
+ def addGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String]): Unit = {
+ namedGauges.add(NamedGauge(name, gauge, labels + roleLabel))
}
protected val namedTimers =
new ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()
- def addTimer(name: String): Unit = {
- val namedTimer = NamedTimer(name, metricRegistry.timer(name, timerSupplier))
+ def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
+
+ def addTimer(name: String, labels: Map[String, String]): Unit = {
+ val namedTimer = NamedTimer(name, metricRegistry.timer(name, timerSupplier), labels + roleLabel)
namedTimers.putIfAbsent(name, (namedTimer, new ConcurrentHashMap[String, Long]()))
}
protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
new ConcurrentHashMap[String, NamedCounter]()
- def addCounter(name: String): Unit = {
- namedCounters.put(name, NamedCounter(name, metricRegistry.counter(name)))
+ def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])
+
+ def addCounter(name: String, labels: Map[String, String] = Map.empty[String, String]): Unit = {
+ namedCounters.put(name, NamedCounter(name, metricRegistry.counter(name), labels + roleLabel))
}
protected def counters(): List[NamedCounter] = {
@@ -208,12 +233,14 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
def recordCounter(nc: NamedCounter): Unit = {
val timestamp = System.currentTimeMillis
+ val label = nc.labelString
updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n")
}
def recordGauge(ng: NamedGauge[_]): Unit = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
+ val label = ng.labelString
sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n")
updateInnerMetrics(sb.toString())
@@ -224,6 +251,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val sb = new mutable.StringBuilder
val snapshot = nh.histogram.getSnapshot
val prefix = normalizeKey(nh.name)
+ val label = nh.labelString
sb.append(s"${prefix}Count$label ${nh.histogram.getCount} $timestamp\n")
sb.append(s"${prefix}Max$label ${reportNanosAsMills(snapshot.getMax)} $timestamp\n")
sb.append(s"${prefix}Mean$label ${reportNanosAsMills(snapshot.getMean)} $timestamp\n")
@@ -249,6 +277,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val sb = new mutable.StringBuilder
val snapshot = nt.timer.getSnapshot
val prefix = normalizeKey(nt.name)
+ val label = nt.labelString
sb.append(s"${prefix}Count$label ${nt.timer.getCount} $timestamp\n")
sb.append(s"${prefix}Max$label ${reportNanosAsMills(snapshot.getMax)} $timestamp\n")
sb.append(s"${prefix}Mean$label ${reportNanosAsMills(snapshot.getMean)} $timestamp\n")
@@ -299,8 +328,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
protected def reportNanosAsMills(value: Double): Double = {
BigDecimal(value / 1000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
}
-
- val label = s"""{role="$role"}"""
}
class TimerSupplier(val slidingWindowSize: Int)
diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
new file mode 100644
index 00000000..1cbec78d
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.metrics.source
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+
+class CelebornSourceSuite extends CelebornFunSuite {
+
+ test("test getMetrics with customized label") {
+ val mockSource = new AbstractSource(new CelebornConf(), "mock") {
+ override def sourceName: String = "mockSource"
+ }
+ mockSource.addGauge("Gauge1", _ => 1000)
+ mockSource.addGauge("Gauge2", _ => 2000, Map("user" -> "user1"))
+ mockSource.addCounter("Counter1")
+ mockSource.incCounter("Counter1", 3000)
+ mockSource.addCounter("Counter2", Map("user" -> "user2"))
+ mockSource.incCounter("Counter2", 4000)
+ mockSource.addTimer("Timer1")
+ mockSource.addTimer("Timer2", Map("user" -> "user3"))
+
+ val res = mockSource.getMetrics()
+ val exp1 = "metrics_Gauge1_Value{role=\"mock\"} 1000"
+ val exp2 = "metrics_Gauge2_Value{user=\"user1\" role=\"mock\"} 2000"
+ val exp3 = "metrics_Counter1_Count{role=\"mock\"} 3000"
+ val exp4 = "metrics_Counter2_Count{user=\"user2\" role=\"mock\"} 4000"
+ val exp5 = "metrics_Timer1_Count{role=\"mock\"} 0"
+ val exp6 = "metrics_Timer2_Count{user=\"user3\" role=\"mock\"} 0"
+
+ assert(res.contains(exp1))
+ assert(res.contains(exp2))
+ assert(res.contains(exp3))
+ assert(res.contains(exp4))
+ assert(res.contains(exp5))
+ assert(res.contains(exp6))
+ }
+}