You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:36:27 UTC

[kafka] 07/07: KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8c8cb111a4fb6f17d80fb55393ff4ffff03828a6
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Jul 21 19:59:05 2022 -0400

    KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)
    
    KRaft remote controllers do not yet support dynamic reconfiguration (https://issues.apache.org/jira/browse/KAFKA-14057). Until we implement that, in the meantime we see that the instantiation of the configured metric reporters is actually performed as part of the wiring for dynamic reconfiguration. Since that wiring does not exist yet for KRaft remote controllers, this patch refactors out the instantiation of the metric reporters from the reconfiguration of them and adjusts the contro [...]
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../main/scala/kafka/server/ControllerServer.scala | 15 +++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 46 +++++++++++++++-------
 2 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 28c98643c3a..212c092e1ab 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -27,6 +27,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
+import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients.ApiVersions
@@ -70,6 +71,8 @@ class ControllerServer(
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
+  config.dynamicConfig.initialize(zkClientOpt = None)
+
   val lock = new ReentrantLock()
   val awaitShutdownCond = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -99,6 +102,13 @@ class ControllerServer(
     true
   }
 
+  private def doRemoteKraftSetup(): Unit = {
+    // Explicitly configure metric reporters on this remote controller.
+    // We do not yet support dynamic reconfiguration on remote controllers in general;
+    // remove this once that is implemented.
+    new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
+  }
+
   def clusterId: String = metaProperties.clusterId
 
   def startup(): Unit = {
@@ -206,6 +216,11 @@ class ControllerServer(
       }
       controller = controllerBuilder.build()
 
+      // Perform any setup that is done only when this node is a controller-only node.
+      if (!config.processRoles.contains(BrokerRole)) {
+        doRemoteKraftSetup()
+      }
+
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 76a42b74fa5..a860938124e 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -258,7 +258,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       case _ =>
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
@@ -748,17 +748,18 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
   }
 }
 
-class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconfigurable {
+class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
+  private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
+  private val currentReporters = reporterState.currentReporters
+  private val dynamicConfig = reporterState.dynamicConfig
 
-  private val dynamicConfig = server.config.dynamicConfig
-  private val metrics = server.metrics
-  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
-  private val currentReporters = mutable.Map[String, MetricsReporter]()
+  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
+    reporterState.metricsReporterClasses(configs)
 
-  createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
-    Collections.emptyMap[String, Object])
+  private def createReporters(reporterClasses: util.List[String], updatedConfigs: util.Map[String, _]): Unit =
+    reporterState.createReporters(reporterClasses, updatedConfigs)
 
-  private[server] def currentMetricsReporters: List[MetricsReporter] = currentReporters.values.toList
+  private def removeReporter(className: String): Unit = reporterState.removeReporter(className)
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
@@ -801,8 +802,23 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
     createReporters(added.asJava, configs)
   }
+}
+
+class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
+  private[server] val dynamicConfig = config.dynamicConfig
+  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
+  private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
+  createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
+    Collections.emptyMap[String, Object])
+
+  private[server] def createReporters(reporterClasses: util.List[String],
+                                      updatedConfigs: util.Map[String, _]): Unit = {
+    createReporters(config, clusterId, reporterClasses, updatedConfigs)
+  }
 
-  private def createReporters(reporterClasses: util.List[String],
+  private def createReporters(config: KafkaConfig,
+                              clusterId: String,
+                              reporterClasses: util.List[String],
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
@@ -811,19 +827,19 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
     // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
     // The first call to MetricsReporter.init is done when we call metrics.addReporter below.
-    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, reporters.asScala)
+    KafkaBroker.notifyMetricsReporters(clusterId, config, reporters.asScala)
     reporters.forEach { reporter =>
       metrics.addReporter(reporter)
       currentReporters += reporter.getClass.getName -> reporter
     }
-    KafkaBroker.notifyClusterListeners(server.clusterId, reporters.asScala)
+    KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
   }
 
-  private def removeReporter(className: String): Unit = {
+  private[server] def removeReporter(className: String): Unit = {
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
-  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
+  private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
     configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }