You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Artem Oboturov <> on 2019/02/08 09:57:50 UTC

Troubleshooting custom implementation for a org.apache.kafka.common.metrics.MetricsReporter


I was checking out how to export metrics from a Kafka Steams App directly
to the Prometheus Registry without using JMX, just by implementing a custom

After some investigation, it became clear that a metric with the same name
could be used by multiple entities, e.g. with different client-id. So it
would be possible to differentiate them by that id.

The MetricsReporter is configured for a Kafka Streams application.

What I felt strange was that the MetricsReporter#init() is always called
with an empty list, so it is not possible to define Prometheus metrics
properly in advance and then keep them there to be updated with new values.
Hence the MetricsReporter#metricChange() is used to both make changes to
metered values and to define new metrics if they were not yet set up. Here
comes a second problem: labels are variable, i.e. they are modified after
they were first initialized, so that trying to set values, I often have the
constraint violation in Prometheus, because it requires the labels set to
be fixed.

Kafka Streams (Scala): 2.1.0-cp1
Kafka:, GCP
Java: 8

java.lang.IllegalArgumentException: Incorrect number of labels.
at io.prometheus.client.SimpleCollector.labels(
at org.apache.kafka.common.metrics.Metrics.registerMetric(
at org.apache.kafka.common.metrics.Sensor.add(
at org.apache.kafka.common.metrics.Sensor.add(

Test implementation of the MetricsReporter:

package privateimpl

import java.util
import java.util.concurrent.ConcurrentHashMap

import com.typesafe.scalalogging.LazyLogging
import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey
import io.prometheus.client.{Collector, CollectorRegistry, Gauge}
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.KafkaMetric

import scala.util.{Failure, Success, Try}

object KafkaStreamsPrometheusMetricsReporter {
    type MKey = (String, String)
//  type MKey = MetricName

  private def toKey(metric: KafkaMetric): MKey = {
    val name = metric.metricName()
//    name

  private def toPrometheusMetric(metric: KafkaMetric): Gauge = {
    val name = metric.metricName()
    val labels = name.tags().keySet().toArray(Array.empty[String]).map {

      .help(s"Kafka description: ${name.description()}")
      .labelNames(labels: _*)

class KafkaStreamsPrometheusMetricsReporter extends
org.apache.kafka.common.metrics.MetricsReporter with LazyLogging {

  import KafkaStreamsPrometheusMetricsReporter._

  private val registry = new CollectorRegistry(true)
  private val metricsCache = new ConcurrentHashMap[MKey, Gauge]

  private def getCachedMetric(metric: KafkaMetric): Gauge = {
    metricsCache.computeIfAbsent(toKey(metric), _ => {
      val p = toPrometheusMetric(metric)
      try {
      } catch {
        case ex: IllegalArgumentException =>
          throw ex

  override def init(metrics: util.List[KafkaMetric]): Unit = {
    metrics.forEach { m =>


  override def metricChange(metric: KafkaMetric): Unit = {
    val name = metric.metricName()
    val collector = getCachedMetric(metric)
    if (collector == null) {
      logger.error("Kafka metric name was not registered: {}", name)
    } else {
      val lables = name.tags().values().toArray(Array.empty[String])

      Try {
      } match {
        case Success(value) =>
          value match {
            case d: java.lang.Double =>
              collector.labels(lables: _*).set(d)
            case v =>
        case Failure(ex) =>
          logger.error("Failed to process {}", ex)

  override def metricRemoval(metric: KafkaMetric): Unit = ()

  override def close(): Unit = ()

  override def configure(configs: util.Map[String, _]): Unit = ()

Re: Troubleshooting custom implementation for a org.apache.kafka.common.metrics.MetricsReporter

Posted by Guozhang Wang <>.
Hello Artem,

Your observation is correct: for Streams as well as many other clients,
MetricsReporter#metricChange() are primarily used for registering new
metrics, this is because at the construction time of the client (hence when
MetricsReporter#init() is called) those finer-grained metrics, like
per-task / process, or per-destination-broker (for producer / consumer
clients, e.g.) are not known yet so they have to be created later during

As for your second question: I'm not sure what do you mean by `metric
labels`, and how they are modified. A `MetricName` contains a name /
group-name, and a tags map, all of them should be fixed when being


On Fri, Feb 8, 2019 at 3:22 PM Artem Oboturov <>

> Hi
> I was checking out how to export metrics from a Kafka Steams App directly
> to the Prometheus Registry without using JMX, just by implementing a custom
> MetricsReporter.
> After some investigation, it became clear that a metric with the same name
> could be used by multiple entities, e.g. with different client-id. So it
> would be possible to differentiate them by that id.
> The MetricsReporter is configured for a Kafka Streams application.
> What I felt strange was that the MetricsReporter#init() is always called
> with an empty list, so it is not possible to define Prometheus metrics
> properly in advance and then keep them there to be updated with new values.
> Hence the MetricsReporter#metricChange() is used to both make changes to
> metered values and to define new metrics if they were not yet set up. Here
> comes a second problem: labels are variable, i.e. they are modified after
> they were first initialized, so that trying to set values, I often have the
> constraint violation in Prometheus, because it requires the labels set to
> be fixed.
> *ENV:*
> Kafka Streams (Scala): 2.1.0-cp1
> Kafka:, GCP
> Java: 8
> java.lang.IllegalArgumentException: Incorrect number of labels.
> at io.prometheus.client.SimpleCollector.labels(
> ~[simpleclient-0.6.0.jar:?]
> at
> privateimpl.KafkaStreamsPrometheusMetricsReporter.metricChange(KafkaStreamsPrometheusMetricsReporter.scala:82)
> ~[classes/:?]
> at org.apache.kafka.common.metrics.Metrics.registerMetric(
> [kafka-clients-2.1.0-cp1.jar:?]
> at org.apache.kafka.common.metrics.Sensor.add(
> [kafka-clients-2.1.0-cp1.jar:?]
> at org.apache.kafka.common.metrics.Sensor.add(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLead(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1600(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> [kafka-clients-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> [kafka-streams-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> [kafka-streams-2.1.0-cp1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> [kafka-streams-2.1.0-cp1.jar:?]
> at
> [kafka-streams-2.1.0-cp1.jar:?]
> Test implementation of the MetricsReporter:
> package privateimpl
> import java.util
> import java.util.concurrent.ConcurrentHashMap
> import com.typesafe.scalalogging.LazyLogging
> import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey
> import io.prometheus.client.{Collector, CollectorRegistry, Gauge}
> import org.apache.kafka.common.MetricName
> import org.apache.kafka.common.metrics.KafkaMetric
> import scala.util.{Failure, Success, Try}
> object KafkaStreamsPrometheusMetricsReporter {
>     type MKey = (String, String)
> //  type MKey = MetricName
>   private def toKey(metric: KafkaMetric): MKey = {
>     val name = metric.metricName()
>     (,
> //    name
>   }
>   private def toPrometheusMetric(metric: KafkaMetric): Gauge = {
>     val name = metric.metricName()
>     val labels = name.tags().keySet().toArray(Array.empty[String]).map {
>       Collector.sanitizeMetricName
>     }
>     Gauge
>       .build(Collector.sanitizeMetricName(,
> Collector.sanitizeMetricName(name.description()))
>       .namespace(Collector.sanitizeMetricName(
>       .help(s"Kafka description: ${name.description()}")
>       .labelNames(labels: _*)
>       .create()
>   }
> }
> class KafkaStreamsPrometheusMetricsReporter extends
> org.apache.kafka.common.metrics.MetricsReporter with LazyLogging {
>   import KafkaStreamsPrometheusMetricsReporter._
>   private val registry = new CollectorRegistry(true)
>   private val metricsCache = new ConcurrentHashMap[MKey, Gauge]
>   private def getCachedMetric(metric: KafkaMetric): Gauge = {
>     metricsCache.computeIfAbsent(toKey(metric), _ => {
>       val p = toPrometheusMetric(metric)
>       try {
>         p.register(registry)
>       } catch {
>         case ex: IllegalArgumentException =>
>           println(ex)
>           throw ex
>       }
>       p
>     })
>   }
>   override def init(metrics: util.List[KafkaMetric]): Unit = {
>     metrics.forEach { m =>
>       getCachedMetric(m)
>       ()
>     }
>   }
>   override def metricChange(metric: KafkaMetric): Unit = {
>     val name = metric.metricName()
>     val collector = getCachedMetric(metric)
>     if (collector == null) {
>       logger.error("Kafka metric name was not registered: {}", name)
>     } else {
>       val lables = name.tags().values().toArray(Array.empty[String])
>       Try {
>         metric.metricValue()
>       } match {
>         case Success(value) =>
>           value match {
>             case d: java.lang.Double =>
>               collector.labels(lables: _*).set(d)
>             case v =>
>               println(v)
>           }
>         case Failure(ex) =>
>           logger.error("Failed to process {}", ex)
>       }
>     }
>   }
>   override def metricRemoval(metric: KafkaMetric): Unit = ()
>   override def close(): Unit = ()
>   override def configure(configs: util.Map[String, _]): Unit = ()
> }

-- Guozhang