You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/05/29 04:15:05 UTC
[incubator-openwhisk] branch master updated: Track Kafka client
side metrics via Kamon (#4481)
This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 658516e Track Kafka client side metrics via Kamon (#4481)
658516e is described below
commit 658516e6db1bf65ea4bd82e2b5e1ef10f69e0391
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed May 29 09:44:54 2019 +0530
Track Kafka client side metrics via Kamon (#4481)
Adds a configurable MetricsReporter to route Kafka metrics to Kamon once enabled. Set of metrics names which need to be captured needs to be explicitly configured
---
common/scala/src/main/resources/application.conf | 15 +++
.../connector/kafka/KafkaConsumerConnector.scala | 1 +
.../connector/kafka/KamonMetricsReporter.scala | 132 +++++++++++++++++++++
3 files changed, 148 insertions(+)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 9d122b7..7fd870e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -95,6 +95,10 @@ whisk {
common {
security-protocol = PLAINTEXT
ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default
+
+ //Enable this for reporting Kafka client metrics
+ //metric-reporters = "org.apache.openwhisk.connector.kafka.KamonMetricsReporter"
+
}
producer {
acks = 1
@@ -154,6 +158,17 @@ whisk {
retention-ms = 3600000
}
}
+
+ metrics {
+ // Name of metrics which should be tracked via Kamon
+ names = [
+ // consumer-fetch-manager-metrics
+ "records-lag-max", // The maximum lag in terms of number of records for any partition in this window
+ "records-consumed-total" // The total number of records consumed
+ ]
+
+ report-interval = 10 seconds
+ }
}
# db related configuration
db {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index 7e673d1..b75c689 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -142,6 +142,7 @@ class KafkaConsumerConnector(
/** Creates a new kafka consumer and subscribes to topic list if given. */
private def createConsumer(topic: String) = {
val config = Map(
+ ConsumerConfig.CLIENT_ID_CONFIG -> s"consumer-$topic",
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
new file mode 100644
index 0000000..4c1d792
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.kafka
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import kamon.Kamon
+import kamon.metric.{Counter, Gauge, Metric}
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
+import org.apache.openwhisk.core.ConfigKeys
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+import scala.util.{Success, Try}
+
+class KamonMetricsReporter extends MetricsReporter {
+ import KamonMetricsReporter._
+ private val metrics = new TrieMap[MetricName, MetricBridge]()
+ private val metricConfig = loadConfigOrThrow[KafkaMetricConfig](s"${ConfigKeys.kafka}.metrics")
+ @volatile
+ private var updater: Option[ScheduledFuture[_]] = None
+
+ override def init(metrics: util.List[KafkaMetric]): Unit = metrics.forEach(add)
+
+ override def metricChange(metric: KafkaMetric): Unit = {
+ remove(metric)
+ add(metric)
+ }
+
+ override def metricRemoval(metric: KafkaMetric): Unit = remove(metric)
+
+ override def close(): Unit = updater.foreach(_.cancel(false))
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ val interval = metricConfig.reportInterval.toSeconds
+ val f = Kamon.scheduler().scheduleAtFixedRate(() => updateAll(), interval, interval, TimeUnit.SECONDS)
+ updater = Some(f)
+ }
+
+ private def add(metric: KafkaMetric): Unit = {
+ val mn = metric.metricName()
+ if (metricConfig.names.contains(mn.name()) && shouldIncludeMetric(mn)) {
+ val tags = mn.tags()
+ val metricName = kamonName(mn)
+ val bridge = if (isCounterMetric(metric)) {
+ val counter = Kamon.counter(metricName)
+ new CounterBridge(metric, counter, counter.refine(tags))
+ } else {
+ val gauge = Kamon.gauge(metricName)
+ new GaugeBridge(metric, gauge, gauge.refine(tags))
+ }
+ metrics.putIfAbsent(mn, bridge)
+ }
+ }
+
+ private def remove(metric: KafkaMetric) = metrics.remove(metric.metricName()).foreach(_.remove())
+
+ private def updateAll(): Unit = metrics.values.foreach(_.update())
+}
+
+object KamonMetricsReporter {
+ private val excludedTopicAttributes = Set("records-lag-max", "records-consumed-total", "bytes-consumed-total")
+
+ case class KafkaMetricConfig(names: Set[String], reportInterval: FiniteDuration)
+
+ abstract class MetricBridge(val kafkaMetric: KafkaMetric, kamonMetric: Metric[_]) {
+ def remove(): Unit = kamonMetric.remove(kafkaMetric.metricName().tags())
+ def update(): Unit
+
+ def metricValue: Long =
+ Try(kafkaMetric.metricValue())
+ .map {
+ case d: java.lang.Double => d.toLong
+ case _ => 0L
+ }
+ .getOrElse(0L)
+ }
+
+ class GaugeBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], gauge: Gauge)
+ extends MetricBridge(kafkaMetric, kamonMetric) {
+ override def update(): Unit = gauge.set(metricValue)
+ }
+
+ class CounterBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], counter: Counter)
+ extends MetricBridge(kafkaMetric, kamonMetric) {
+ @volatile
+ private var lastValue: Long = 0
+ override def update(): Unit = {
+ val newValue = metricValue
+ counter.increment(newValue - lastValue)
+ lastValue = newValue
+ }
+ }
+
+ def kamonName(mn: MetricName): String = {
+ //Drop the `-total` suffix as it results in prometheus metrics ending with total twice
+ val name = if (mn.name().endsWith("-total")) mn.name().dropRight(6) else mn.name()
+ s"${mn.group()}_$name"
+ }
+
+ def isCounterMetric(metric: KafkaMetric): Boolean = Try(metric.measurable()) match {
+ case Success(_: Total) => true
+ case _ => false
+ }
+
+ def shouldIncludeMetric(m: MetricName): Boolean = {
+ //Avoid duplicate metrics for specific cases which are recorded at multiple level
+ //For example `bytes-consumed-total` is recorded at consumer and topic level. As we use a 1-1 consumer per topic
+ //We can drop the lag recording at topic level
+ if (excludedTopicAttributes.contains(m.name())) !m.tags().containsKey("topic")
+ else true
+ }
+}