You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/05/29 04:15:05 UTC

[incubator-openwhisk] branch master updated: Track Kafka client side metrics via Kamon (#4481)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 658516e  Track Kafka client side metrics via Kamon (#4481)
658516e is described below

commit 658516e6db1bf65ea4bd82e2b5e1ef10f69e0391
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Wed May 29 09:44:54 2019 +0530

    Track Kafka client side metrics via Kamon (#4481)
    
    Adds a configurable MetricsReporter to route Kafka metrics to Kamon once enabled. Set of metrics names which need to be captured needs to be explicitly configured
---
 common/scala/src/main/resources/application.conf   |  15 +++
 .../connector/kafka/KafkaConsumerConnector.scala   |   1 +
 .../connector/kafka/KamonMetricsReporter.scala     | 132 +++++++++++++++++++++
 3 files changed, 148 insertions(+)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 9d122b7..7fd870e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -95,6 +95,10 @@ whisk {
         common {
             security-protocol = PLAINTEXT
             ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default
+
+            //Enable this for reporting Kafka client metrics
+            //metric-reporters = "org.apache.openwhisk.connector.kafka.KamonMetricsReporter"
+
         }
         producer {
             acks = 1
@@ -154,6 +158,17 @@ whisk {
                 retention-ms    = 3600000
             }
         }
+
+        metrics {
+            // Name of metrics which should be tracked via Kamon
+            names = [
+                // consumer-fetch-manager-metrics
+                "records-lag-max", // The maximum lag in terms of number of records for any partition in this window
+                "records-consumed-total" // The total number of records consumed
+            ]
+
+            report-interval = 10 seconds
+        }
     }
     # db related configuration
     db {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
index 7e673d1..b75c689 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaConsumerConnector.scala
@@ -142,6 +142,7 @@ class KafkaConsumerConnector(
   /** Creates a new kafka consumer and subscribes to topic list if given. */
   private def createConsumer(topic: String) = {
     val config = Map(
+      ConsumerConfig.CLIENT_ID_CONFIG -> s"consumer-$topic",
       ConsumerConfig.GROUP_ID_CONFIG -> groupid,
       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
       ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
new file mode 100644
index 0000000..4c1d792
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.connector.kafka
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+
+import kamon.Kamon
+import kamon.metric.{Counter, Gauge, Metric}
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
+import org.apache.openwhisk.core.ConfigKeys
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+import scala.util.{Success, Try}
+
+class KamonMetricsReporter extends MetricsReporter {
+  import KamonMetricsReporter._
+  private val metrics = new TrieMap[MetricName, MetricBridge]()
+  private val metricConfig = loadConfigOrThrow[KafkaMetricConfig](s"${ConfigKeys.kafka}.metrics")
+  @volatile
+  private var updater: Option[ScheduledFuture[_]] = None
+
+  override def init(metrics: util.List[KafkaMetric]): Unit = metrics.forEach(add)
+
+  override def metricChange(metric: KafkaMetric): Unit = {
+    remove(metric)
+    add(metric)
+  }
+
+  override def metricRemoval(metric: KafkaMetric): Unit = remove(metric)
+
+  override def close(): Unit = updater.foreach(_.cancel(false))
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    val interval = metricConfig.reportInterval.toSeconds
+    val f = Kamon.scheduler().scheduleAtFixedRate(() => updateAll(), interval, interval, TimeUnit.SECONDS)
+    updater = Some(f)
+  }
+
+  private def add(metric: KafkaMetric): Unit = {
+    val mn = metric.metricName()
+    if (metricConfig.names.contains(mn.name()) && shouldIncludeMetric(mn)) {
+      val tags = mn.tags()
+      val metricName = kamonName(mn)
+      val bridge = if (isCounterMetric(metric)) {
+        val counter = Kamon.counter(metricName)
+        new CounterBridge(metric, counter, counter.refine(tags))
+      } else {
+        val gauge = Kamon.gauge(metricName)
+        new GaugeBridge(metric, gauge, gauge.refine(tags))
+      }
+      metrics.putIfAbsent(mn, bridge)
+    }
+  }
+
+  private def remove(metric: KafkaMetric) = metrics.remove(metric.metricName()).foreach(_.remove())
+
+  private def updateAll(): Unit = metrics.values.foreach(_.update())
+}
+
+object KamonMetricsReporter {
+  private val excludedTopicAttributes = Set("records-lag-max", "records-consumed-total", "bytes-consumed-total")
+
+  case class KafkaMetricConfig(names: Set[String], reportInterval: FiniteDuration)
+
+  abstract class MetricBridge(val kafkaMetric: KafkaMetric, kamonMetric: Metric[_]) {
+    def remove(): Unit = kamonMetric.remove(kafkaMetric.metricName().tags())
+    def update(): Unit
+
+    def metricValue: Long =
+      Try(kafkaMetric.metricValue())
+        .map {
+          case d: java.lang.Double => d.toLong
+          case _                   => 0L
+        }
+        .getOrElse(0L)
+  }
+
+  class GaugeBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], gauge: Gauge)
+      extends MetricBridge(kafkaMetric, kamonMetric) {
+    override def update(): Unit = gauge.set(metricValue)
+  }
+
+  class CounterBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], counter: Counter)
+      extends MetricBridge(kafkaMetric, kamonMetric) {
+    @volatile
+    private var lastValue: Long = 0
+    override def update(): Unit = {
+      val newValue = metricValue
+      counter.increment(newValue - lastValue)
+      lastValue = newValue
+    }
+  }
+
+  def kamonName(mn: MetricName): String = {
+    //Drop the `-total` suffix as it results in prometheus metrics ending with total twice
+    val name = if (mn.name().endsWith("-total")) mn.name().dropRight(6) else mn.name()
+    s"${mn.group()}_$name"
+  }
+
+  def isCounterMetric(metric: KafkaMetric): Boolean = Try(metric.measurable()) match {
+    case Success(_: Total) => true
+    case _                 => false
+  }
+
+  def shouldIncludeMetric(m: MetricName): Boolean = {
+    //Avoid duplicate metrics for specific cases which are recorded at multiple level
+    //For example `bytes-consumed-total` is recorded at consumer and topic level. As we use a 1-1 consumer per topic
+    //We can drop the lag recording at topic level
+    if (excludedTopicAttributes.contains(m.name())) !m.tags().containsKey("topic")
+    else true
+  }
+}