You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/07/21 23:59:21 UTC

[kafka] branch trunk updated: KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06462c7be1 KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)
06462c7be1 is described below

commit 06462c7be175cb730b20de0333d1ac48b3286282
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Jul 21 19:59:05 2022 -0400

    KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)
    
    KRaft remote controllers do not yet support dynamic reconfiguration (https://issues.apache.org/jira/browse/KAFKA-14057). Until we implement that, in the meantime we see that the instantiation of the configured metric reporters is actually performed as part of the wiring for dynamic reconfiguration. Since that wiring does not exist yet for KRaft remote controllers, this patch refactors out the instantiation of the metric reporters from the reconfiguration of them and adjusts the contro [...]
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../main/scala/kafka/server/ControllerServer.scala | 15 +++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 46 +++++++++++++++-------
 2 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 67b3f0276d..cff88d2b6b 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -28,6 +28,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
+import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients.ApiVersions
@@ -68,6 +69,8 @@ class ControllerServer(
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
+  config.dynamicConfig.initialize(zkClientOpt = None)
+
   val lock = new ReentrantLock()
   val awaitShutdownCond = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -97,6 +100,13 @@ class ControllerServer(
     true
   }
 
+  private def doRemoteKraftSetup(): Unit = {
+    // Explicitly configure metric reporters on this remote controller.
+    // We do not yet support dynamic reconfiguration on remote controllers in general;
+    // remove this once that is implemented.
+    new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
+  }
+
   def clusterId: String = metaProperties.clusterId
 
   def startup(): Unit = {
@@ -202,6 +212,11 @@ class ControllerServer(
       }
       controller = controllerBuilder.build()
 
+      // Perform any setup that is done only when this node is a controller-only node.
+      if (!config.processRoles.contains(BrokerRole)) {
+        doRemoteKraftSetup()
+      }
+
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 33511147e6..d7828ce0c0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -254,7 +254,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       case _ =>
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
@@ -744,17 +744,18 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
   }
 }
 
-class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconfigurable {
+class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
+  private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
+  private val currentReporters = reporterState.currentReporters
+  private val dynamicConfig = reporterState.dynamicConfig
 
-  private val dynamicConfig = server.config.dynamicConfig
-  private val metrics = server.metrics
-  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
-  private val currentReporters = mutable.Map[String, MetricsReporter]()
+  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
+    reporterState.metricsReporterClasses(configs)
 
-  createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
-    Collections.emptyMap[String, Object])
+  private def createReporters(reporterClasses: util.List[String], updatedConfigs: util.Map[String, _]): Unit =
+    reporterState.createReporters(reporterClasses, updatedConfigs)
 
-  private[server] def currentMetricsReporters: List[MetricsReporter] = currentReporters.values.toList
+  private def removeReporter(className: String): Unit = reporterState.removeReporter(className)
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
@@ -797,8 +798,23 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
     createReporters(added.asJava, configs)
   }
+}
+
+class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
+  private[server] val dynamicConfig = config.dynamicConfig
+  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
+  private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
+  createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
+    Collections.emptyMap[String, Object])
+
+  private[server] def createReporters(reporterClasses: util.List[String],
+                                      updatedConfigs: util.Map[String, _]): Unit = {
+    createReporters(config, clusterId, reporterClasses, updatedConfigs)
+  }
 
-  private def createReporters(reporterClasses: util.List[String],
+  private def createReporters(config: KafkaConfig,
+                              clusterId: String,
+                              reporterClasses: util.List[String],
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
@@ -807,19 +823,19 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
     // The first call to MetricsReporter.init is done when we call metrics.addReporter below.
-    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, reporters.asScala)
+    KafkaBroker.notifyMetricsReporters(clusterId, config, reporters.asScala)
     reporters.forEach { reporter =>
       metrics.addReporter(reporter)
       currentReporters += reporter.getClass.getName -> reporter
     }
-    KafkaBroker.notifyClusterListeners(server.clusterId, reporters.asScala)
+    KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
   }
 
-  private def removeReporter(className: String): Unit = {
+  private[server] def removeReporter(className: String): Unit = {
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
-  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
+  private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
     configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }