You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/10 09:28:23 UTC

[GitHub] [kafka] tombentley commented on a diff in pull request #12046: KAFKA-10360: Allow disabling JMX Reporter (KIP-830)

tombentley commented on code in PR #12046:
URL: https://github.com/apache/kafka/pull/12046#discussion_r942126401


##########
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##########
@@ -98,6 +102,10 @@ public class CommonClientConfigs {
 
     public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
 
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = "auto.include.jmx.reporter";
+    public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.";

Review Comment:
   Let's remember to open a JIRA for the removal of this in 4.0, so we don't forget.



##########
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##########
@@ -216,4 +224,23 @@ public static void postValidateSaslMechanismConfig(AbstractConfig config) {
             }
         }
     }
+
+    public static List<MetricsReporter> metricsReporters(AbstractConfig config) {
+        return metricsReporters(Collections.emptyMap(), config);
+    }
+
+    public static List<MetricsReporter> metricsReporters(String clientId, AbstractConfig config) {
+        return metricsReporters(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId), config);
+    }
+
+    public static List<MetricsReporter> metricsReporters(Map<String, Object> clientIdOverride, AbstractConfig config) {
+        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class, clientIdOverride);
+        if (config.getBoolean(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG) && reporters.stream().noneMatch(r -> r instanceof JmxReporter)) {

Review Comment:
   If someone has subclassed `JmxReporter` then the `instanceof` would yield true, but it wouldn't have found Kafka's `JmxReporter`. i.e. it should probably be `JmxReporter.class.equals(r.getClass())` rather than instanceof.
   
   As well as being used here, this condition is repeated in several other places: Can we factor it out into some `shouldAddJmxReporter` method on this class?
   
   Finally, line splitting at the `&&` would improve readability. 



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java:
##########
@@ -53,16 +72,21 @@ public static MetricsRegistry defaultRegistry() {
     }
 
     private final MetricsRegistry metricsRegistry = new MetricsRegistry();
-    private final FilteringJmxReporter jmxReporter = new FilteringJmxReporter(metricsRegistry,
-        metricName -> true);
+    private FilteringJmxReporter jmxReporter;
 
     private KafkaYammerMetrics() {
-        jmxReporter.start();
-        Exit.addShutdownHook("kafka-jmx-shutdown-hook", jmxReporter::shutdown);
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public void configure(Map<String, ?> configs) {
+        AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
+        List<String> reporters = config.getList(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG);
+        if (config.getBoolean(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG) || reporters.stream().anyMatch(r -> JmxReporter.class.getName().equals(r))) {

Review Comment:
   Here's another possible call site for `shouldAddJmxReporter`.



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java:
##########
@@ -53,16 +72,21 @@ public static MetricsRegistry defaultRegistry() {
     }
 
     private final MetricsRegistry metricsRegistry = new MetricsRegistry();
-    private final FilteringJmxReporter jmxReporter = new FilteringJmxReporter(metricsRegistry,
-        metricName -> true);
+    private FilteringJmxReporter jmxReporter;
 
     private KafkaYammerMetrics() {
-        jmxReporter.start();

Review Comment:
   So previously the jmx reporter started from the point that the `KafkaYammerMetrics` class was initialized. But now it's started only once `configure` is called. In `BrokerServer` `configure()` is not called immediately on access of `KafkaYammerMetrics.INSTANCE`. It's not clear from a quick look at the code that initialization ever happens, or if it does, what might go unobserved due to the deferred start. 



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -839,8 +840,14 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
+  @nowarn("cat=deprecation")
   private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
-    configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+    val reporters = mutable.Buffer[String]()
+    reporters ++= configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+    if (configs.get(KafkaConfig.AutoIncludeJmxReporterProp).asInstanceOf[Boolean] && !reporters.contains(classOf[JmxReporter].getName)) {

Review Comment:
   This is a call site for the `shouldAddJmxReporter` method I mentioned
   
   And if we didn't do the `shouldAddJmxReporter` thing then surely we should be using exactly the same test on what reporters contains (whether `instanceof`, `Class.equals()` or `.equals` on the class name)?



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java:
##########
@@ -78,8 +102,10 @@ public void validateReconfiguration(Map<String, ?> configs) throws ConfigExcepti
 
     @Override
     public void reconfigure(Map<String, ?> configs) {
-        Predicate<String> mBeanPredicate = JmxReporter.compilePredicate(configs);
-        jmxReporter.updatePredicate(metricName -> mBeanPredicate.test(metricName.getMBeanName()));
+        if (jmxReporter != null) {
+            Predicate<String> mBeanPredicate = JmxReporter.compilePredicate(configs);
+            jmxReporter.updatePredicate(metricName -> mBeanPredicate.test(metricName.getMBeanName()));
+        }

Review Comment:
   Do we need to cover that case where the JMX reporter is initially configured and then dynamically removed and therefore needs to be `shutdown()`? And the other way around too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org