You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/01 23:07:55 UTC

[GitHub] [kafka] rondagostino opened a new pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

rondagostino opened a new pull request #10019:
URL: https://github.com/apache/kafka/pull/10019


   Dynamic broker reconfiguration needs to occur for both ZooKeeper-based brokers and brokers that use a Raft-based metadata quorum.  `DynamicBrokerConfig` currently operates on `KafkaServer`, but it needs to operate on `BrokerServer` (the broker implementation that will use the Raft metadata log) as well. This PR introduces a `KafkaBroker` trait to allow dynamic reconfiguration to work with either implementation.
   
   Existing tests are sufficient to detect bugs and regressions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568242355



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -669,7 +669,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
     updateLogsConfig(newBrokerDefaults.asScala)
 
     if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
-      server.kafkaController.enableDefaultUncleanLeaderElection()
+      server.zkBasedKafkaController.foreach {_.enableDefaultUncleanLeaderElection() }

Review comment:
       It is idiomatic to use `()` instead of `{}` when the lambda is a single expression. E.g.
   ```scala
          server.zkBasedKafkaController.foreach(_.enableDefaultUncleanLeaderElection())
   ```

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -936,7 +936,7 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
+    server.zkBasedKafkaController.foreach {_.updateBrokerInfo(server.createBrokerInfo) }

Review comment:
       It is idiomatic to use `()` instead of `{}` when the lambda is a single expression. E.g.
   ```scala
          server.zkBasedKafkaController.foreach(_.enableDefaultUncleanLeaderElection())
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {

Review comment:
       Why does this trait extends `Logging` and `KafkaMetricsGroup`? These feel like an implementation detail.
   
   This looks like a very important trait and concept. Should we add documentation to all of the public methods?

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       Ideally, we should not have a reference to ZooKeeper on a trait that is implemented by both the ZK quorum and Raft quorum. Maybe `kafkaController: Option[KafkaController]`.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics

Review comment:
       The methods `clusterId` and `metrics` have parenthesis `()` while the rest of the methods do not. Should we just remove the parenthesis for these methods? 

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")

Review comment:
       Why is "Raft-based" mentioned in generic `KafkaBroker` trait? Maybe we can remove the default implementation and force the classes implementing this trait to override the method.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }

Review comment:
       This code is duplicated in `KafkaServer`.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")
+
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId())
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)

Review comment:
       I think you can make this `private[this]`.
   
   If you want to make sure that these fields don't get leaked to implementing classes maybe we can move this default metric initialization to a companion object (static in Java) method:
   
   ```scala
   object KafkaBroker {
     def initializeDefaultMetrics(metricsGroup: KafkaMetricsGroup, kafkaServer: KafkaBroker): Unit
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }

Review comment:
       This code is duplicated in `KafkaServer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10019:
URL: https://github.com/apache/kafka/pull/10019


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#issuecomment-772945162


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#issuecomment-772706382


   I agree with the comments from earlier that we don't want to expose the pre-KIP-500 `KafkaController` in the `KafkaServer` interface.  I think casting to `KafkaServer` or `RaftBrokerServer` is a reasonable solution.  It fits with the general (anti?) pattern in `DynamicBrokerConfig` of reaching directly into broker fields to do stuff.  Arguably a lot of this code should be in the broker class itself for just that reason, so that these broker fields can be private. But, one step at a time.
   
   We should disallow changing broker listener configurations if KIP-500 is enabled, until we implement re-registering the broker.  (This isn't a lot of work but we just haven't done it yet...)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569000955



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")
+
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId())
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)

Review comment:
       Agree on access -- I'll make it `private`.  Moving everything else feels like a bit of overkill to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568982642



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")

Review comment:
       I'm going to remove these in favor or having the caller cast.  See comment below for details.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568982642



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")

Review comment:
       I'm going to remove these in favor or having the caller cast.  See comment below for details.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       It's straightforward for the calling code to cast to a `KafkaServer` or `BrokerServer` since the way to deal with the two cases can be totally different.  I'l push a commit doing this.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {

Review comment:
       Metrics need to exist for compatibility, of course, and while a pure interface sounds like a great idea, it feels practical to put the metrics here since they are shared by all types of brokers -- they're basically part of the interface we've agreed to maintain for the sake of compatibility.
   
   I started to do the documentation as you suggested, but they're all pretty useless (`// the authorizer`, for example).
   
   I was able to remove `Logging` since it is redundant -- `KafkaMetricsGroup` already has it.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")
+
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId())
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)

Review comment:
       Agree on access -- I'll make it `private`.  Moving everything else feels like a bit of overkill to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568991855



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {

Review comment:
       Metrics need to exist for compatibility, of course, and while a pure interface sounds like a great idea, it feels practical to put the metrics here since they are shared by all types of brokers -- they're basically part of the interface we've agreed to maintain for the sake of compatibility.
   
   I started to do the documentation as you suggested, but they're all pretty useless (`// the authorizer`, for example).
   
   I was able to remove `Logging` since it is redundant -- `KafkaMetricsGroup` already has it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568672128



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       Is there a shared interface for the Controller though? It seems that maybe you want to extract the methods that are needed into the trait instead of this approach.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.KafkaScheduler
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId: String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics: Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId)
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())

Review comment:
       Why don't you have to override the metric name here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10019:
URL: https://github.com/apache/kafka/pull/10019


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568983285



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       It's straightforward for the calling code to cast to a `KafkaServer` or `BrokerServer` since the way to deal with the two cases can be totally different.  I'l push a commit doing this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569617960



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -936,7 +939,10 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
+    server match {
+      case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
+      case _ =>

Review comment:
       We should have a KafkaServer method for changing the configuration.
   
   In ZK mode it will have to update the broker info in the old controller.
   In KIP-500 mode it will re-register with the controller quorum.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569617960



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -936,7 +939,10 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
+    server match {
+      case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
+      case _ =>

Review comment:
       Can you log an error message here?  This something we will need to implement (re-registration on listener change) but we don't have time right now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568672128



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       Is there a shared interface for the Controller though? It seems that maybe you want to extract the methods that are needed into the trait instead of this approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569619467



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.KafkaScheduler
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId: String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics: Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId)
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())

Review comment:
       Yes, I think the metric name needs to be overridden, so that the mbean name can continue to be `KafkaServer:<whatever>`.  There is some code in the old kip-500 branch that shows how to do it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r568242355



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -669,7 +669,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
     updateLogsConfig(newBrokerDefaults.asScala)
 
     if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
-      server.kafkaController.enableDefaultUncleanLeaderElection()
+      server.zkBasedKafkaController.foreach {_.enableDefaultUncleanLeaderElection() }

Review comment:
       It is idiomatic to use `()` instead of `{}` when the lambda is a single expression. E.g.
   ```scala
          server.zkBasedKafkaController.foreach(_.enableDefaultUncleanLeaderElection())
   ```

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -936,7 +936,7 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
+    server.zkBasedKafkaController.foreach {_.updateBrokerInfo(server.createBrokerInfo) }

Review comment:
       It is idiomatic to use `()` instead of `{}` when the lambda is a single expression. E.g.
   ```scala
          server.zkBasedKafkaController.foreach(_.enableDefaultUncleanLeaderElection())
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {

Review comment:
       Why does this trait extends `Logging` and `KafkaMetricsGroup`? These feel like an implementation detail.
   
   This looks like a very important trait and concept. Should we add documentation to all of the public methods?

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None

Review comment:
       Ideally, we should not have a reference to ZooKeeper on a trait that is implemented by both the ZK quorum and Raft quorum. Maybe `kafkaController: Option[KafkaController]`.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics

Review comment:
       The methods `clusterId` and `metrics` have parenthesis `()` while the rest of the methods do not. Should we just remove the parenthesis for these methods? 

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")

Review comment:
       Why is "Raft-based" mentioned in generic `KafkaBroker` trait? Maybe we can remove the default implementation and force the classes implementing this trait to override the method.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }

Review comment:
       This code is duplicated in `KafkaServer`.

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends Logging with KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId(): String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics(): Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  // must override the following in KafkaServer since they only apply there
+  def zkBasedKafkaController: Option[KafkaController] = None
+  def createBrokerInfo: BrokerInfo = throw new UnsupportedOperationException("Unsupported in with a Raft-based meadata quorum")
+
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId())
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)

Review comment:
       I think you can make this `private[this]`.
   
   If you want to make sure that these fields don't get leaked to implementing classes maybe we can move this default metric initialization to a companion object (static in Java) method:
   
   ```scala
   object KafkaBroker {
     def initializeDefaultMetrics(metricsGroup: KafkaMetricsGroup, kafkaServer: KafkaBroker): Unit
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.controller.KafkaController
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, Logging}
+import kafka.zk.BrokerInfo
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }

Review comment:
       This code is duplicated in `KafkaServer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569619467



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.KafkaScheduler
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId: String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics: Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId)
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())

Review comment:
       Yes, I think the metric name needs to be overridden...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10019:
URL: https://github.com/apache/kafka/pull/10019#discussion_r569010031



##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.KafkaScheduler
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId: String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics: Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId)
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())

Review comment:
       Why don't you have to override the metric name here?

##########
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import com.yammer.metrics.{core => yammer}
+import kafka.log.LogManager
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.utils.KafkaScheduler
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.ClusterResource
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, Metrics, MetricsReporter}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+object KafkaBroker {
+  //properties for MetricsContext
+  val metricsPrefix: String = "kafka.server"
+  private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+  private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
+  private[server] def createKafkaMetricsContext(clusterId: String, config: KafkaConfig): KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+    val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+    metricsContext
+  }
+
+  private[server] def notifyClusterListeners(clusterId: String,
+                                             clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
+  private[server] def notifyMetricsReporters(clusterId: String,
+                                             config: KafkaConfig,
+                                             metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext(clusterId, config)
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+}
+
+trait KafkaBroker extends KafkaMetricsGroup {
+  def authorizer: Option[Authorizer]
+  def clusterId: String
+  def config: KafkaConfig
+  def brokerState: BrokerState
+  def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
+  def kafkaScheduler: KafkaScheduler
+  def kafkaYammerMetrics: KafkaYammerMetrics
+  def logManager: LogManager
+  def metrics: Metrics
+  def quotaManagers: QuotaFactory.QuotaManagers
+  def replicaManager: ReplicaManager
+  def socketServer: SocketServer
+
+  newKafkaServerGauge("BrokerState", () => brokerState.currentState)
+  newKafkaServerGauge("ClusterId", () => clusterId)
+  newKafkaServerGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+  private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", Time.SYSTEM, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
+  }
+
+  // For backwards compatibility, we need to keep older metrics tied
+  // to their original name when this class was named `KafkaServer`
+  private def newKafkaServerGauge[T](metricName: String, gauge: yammer.Gauge[T]): yammer.Gauge[T] = {
+    val explicitName = explicitMetricName(
+      group = "kafka.server",
+      typeName = "KafkaServer",
+      name = metricName,
+      tags = Map.empty
+    )
+    KafkaYammerMetrics.defaultRegistry().newGauge(explicitName, gauge)

Review comment:
       Check `KafkaZkClient` for an example of how this can be done more concisely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org