You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/07 08:13:35 UTC

[kafka] branch trunk updated: KAFKA-13801: Kafka server does not respect MetricsReporter contract for dynamically configured reporters (#11998)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60c0916bfd KAFKA-13801: Kafka server does not respect MetricsReporter contract for dynamically configured reporters (#11998)
60c0916bfd is described below

commit 60c0916bfdc87ddf2e01f172d5455bfeaa8e8cc7
Author: Xavier Léauté <xv...@apache.org>
AuthorDate: Thu Apr 7 01:13:15 2022 -0700

    KAFKA-13801: Kafka server does not respect MetricsReporter contract for dynamically configured reporters (#11998)
    
    MetricsReporter.contextChange contract states the method should always be called first before MetricsReporter.init is called. This is done correctly for reporters enabled by default (e.g. JmxReporter) but not for metrics reporters configured dynamically.
    
    This fixes the call ordering for dynamically configured metrics reporter and updates tests to enforce ordering.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/server/DynamicBrokerConfig.scala       | 5 ++++-
 .../kafka/server/DynamicBrokerReconfigurationTest.scala          | 9 +++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2a4fd9501b..f726569e84 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -800,12 +800,15 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     updatedConfigs.forEach { (k, v) => props.put(k, v.asInstanceOf[AnyRef]) }
     propsOverride.forKeyValue { (k, v) => props.put(k, v) }
     val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
+    // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
+    // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
+    // The first call to MetricsReporter.init is done when we call metrics.addReporter below.
+    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, reporters.asScala)
     reporters.forEach { reporter =>
       metrics.addReporter(reporter)
       currentReporters += reporter.getClass.getName -> reporter
     }
     KafkaBroker.notifyClusterListeners(server.clusterId, reporters.asScala)
-    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, reporters.asScala)
   }
 
   private def removeReporter(className: String): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 24d1eb969e..d429471710 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -52,8 +52,7 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.provider.FileConfigProvider
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.Quota
-import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
 import org.apache.kafka.common.record.TimestampType
@@ -1813,6 +1812,7 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
   import TestMetricsReporter._
   val kafkaMetrics = ArrayBuffer[KafkaMetric]()
   @volatile var initializeCount = 0
+  @volatile var contextChangeCount = 0
   @volatile var configureCount = 0
   @volatile var reconfigureCount = 0
   @volatile var closeCount = 0
@@ -1820,7 +1820,12 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
   @volatile var pollingInterval: Int = -1
   testReporters.add(this)
 
+  override def contextChange(metricsContext: MetricsContext): Unit = {
+    contextChangeCount += 1
+  }
+
   override def init(metrics: util.List[KafkaMetric]): Unit = {
+    assertTrue(contextChangeCount > 0, "contextChange must be called before init")
     kafkaMetrics ++= metrics.asScala
     initializeCount += 1
   }