You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/25 05:02:58 UTC

[kafka] branch trunk updated: KAFKA-5066; Add KafkaMetricsConfig (Yammer metrics reporters) props to documentation

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c835331  KAFKA-5066; Add KafkaMetricsConfig (Yammer metrics reporters) props to documentation
c835331 is described below

commit c835331c7855c4bd60b3eb82b8f284006a1ee9b7
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon Sep 24 22:01:00 2018 -0700

    KAFKA-5066; Add KafkaMetricsConfig (Yammer metrics reporters) props to documentation
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5563 from omkreddy/KAFKA-5066-KAFKA-METRICS-CONFIG
---
 .../scala/kafka/metrics/KafkaMetricsConfig.scala   |  9 +++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 24 ++++++++++++++++++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  5 +++++
 .../server/KafkaMetricReporterClusterIdTest.scala  |  2 +-
 4 files changed, 36 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
index ad9eb20..a3a21bd 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
@@ -20,7 +20,8 @@
 
 package kafka.metrics
 
-import kafka.utils.{VerifiableProperties, CoreUtils}
+import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.{CoreUtils, VerifiableProperties}
 
 class KafkaMetricsConfig(props: VerifiableProperties) {
 
@@ -28,10 +29,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
    * Comma-separated list of reporter types. These classes should be on the
    * classpath and will be instantiated at run-time.
    */
-  val reporters = CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
+  val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
+    Defaults.KafkaMetricReporterClasses))
 
   /**
    * The metrics polling interval (in seconds).
    */
-  val pollingIntervalSecs = props.getInt("kafka.metrics.polling.interval.secs", 10)
+  val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
+    Defaults.KafkaMetricsPollingIntervalSeconds)
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1bc9707..9022502 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -203,6 +203,11 @@ object Defaults {
   val MetricReporterClasses = ""
   val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
 
+
+  /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
+  val KafkaMetricReporterClasses = ""
+  val KafkaMetricsPollingIntervalSeconds = 10
+
   /** ********* SSL configuration ***********/
   val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
   val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
@@ -410,6 +415,10 @@ object KafkaConfig {
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
   val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
 
+  /** ********* Kafka Yammer Metrics Reporters Configuration ***********/
+  val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
+  val KafkaMetricsPollingIntervalSecondsProp = "kafka.metrics.polling.interval.secs"
+
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
 
@@ -720,6 +729,17 @@ object KafkaConfig {
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
   val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
 
+
+  /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
+  val KafkaMetricsReporterClassesDoc = "A list of classes to use as Yammer metrics custom reporters." +
+    " The reporters should implement <code>kafka.metrics.KafkaMetricsReporter</code> trait. If a client wants" +
+    " to expose JMX operations on a custom reporter, the custom reporter needs to additionally implement an MBean" +
+    " trait that extends <code>kafka.metrics.KafkaMetricsReporterMBean</code> trait so that the registered MBean is compliant with" +
+    " the standard MBean convention."
+
+  val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval (in seconds) which can be used" +
+    s" in $KafkaMetricsReporterClassesProp implementations."
+
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
 
@@ -945,6 +965,10 @@ object KafkaConfig {
       .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
       .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
 
+      /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
+      .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
+      .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc)
+
       /** ********* Quota configuration ***********/
       .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
       .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b7a8951..b75c3e7 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_8_2}
 import kafka.cluster.EndPoint
 import kafka.message._
+import kafka.metrics.KafkaMetricsConfig
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Sensor
@@ -724,6 +725,10 @@ class KafkaConfigTest {
         case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
 
+        //Kafka Yammer metrics reporter configs
+        case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
+        case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
+
         case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index dfcb4ac..4d514a0 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -83,7 +83,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     val props = TestUtils.createBrokerConfig(1, zkConnect)
-    props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
+    props.setProperty(KafkaConfig.KafkaMetricsReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
     props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
     props.setProperty(KafkaConfig.BrokerIdProp, "-1")