You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/24 16:30:50 UTC
[kafka] branch trunk updated: KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0507597597e KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)
0507597597e is described below
commit 0507597597e1d2d9adaa33b87e5ea509e12fd2f0
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Aug 24 18:30:31 2022 +0200
KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tb...@redhat.com>, Christo Lolov <ch...@yahoo.com>
---
.../apache/kafka/clients/CommonClientConfigs.java | 28 ++++++++
.../kafka/clients/admin/AdminClientConfig.java | 9 +++
.../kafka/clients/admin/KafkaAdminClient.java | 16 ++---
.../kafka/clients/consumer/ConsumerConfig.java | 11 ++++
.../kafka/clients/consumer/KafkaConsumer.java | 7 +-
.../kafka/clients/producer/KafkaProducer.java | 8 +--
.../kafka/clients/producer/ProducerConfig.java | 9 +++
.../kafka/clients/CommonClientConfigsTest.java | 45 ++++++++++++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 77 ++++++++++++++++------
.../kafka/clients/consumer/KafkaConsumerTest.java | 22 +++++++
.../kafka/clients/producer/KafkaProducerTest.java | 26 +++++++-
.../connect/mirror/MirrorConnectorConfig.java | 15 +++--
.../connect/mirror/MirrorConnectorConfigTest.java | 22 +++++++
.../kafka/connect/runtime/ConnectMetrics.java | 7 +-
.../apache/kafka/connect/runtime/WorkerConfig.java | 8 +++
.../runtime/distributed/WorkerGroupMember.java | 9 +--
.../kafka/connect/runtime/ConnectMetricsTest.java | 26 ++++++++
.../runtime/distributed/WorkerGroupMemberTest.java | 38 ++++++++++-
.../scala/kafka/server/DynamicBrokerConfig.scala | 14 +++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ++
core/src/main/scala/kafka/server/Server.scala | 21 ++----
.../kafka/server/DynamicBrokerConfigTest.scala | 57 +++++++++++++++-
.../org/apache/kafka/streams/KafkaStreams.java | 14 ++--
.../org/apache/kafka/streams/StreamsConfig.java | 9 +++
24 files changed, 406 insertions(+), 98 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index b142867abc9..d88aa0a6a1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -19,12 +19,16 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -98,6 +102,10 @@ public class CommonClientConfigs {
public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = "auto.include.jmx.reporter";
+ public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.";
+
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
Utils.join(SecurityProtocol.names(), ", ") + ".";
@@ -216,4 +224,24 @@ public class CommonClientConfigs {
}
}
}
+
+ public static List<MetricsReporter> metricsReporters(AbstractConfig config) {
+ return metricsReporters(Collections.emptyMap(), config);
+ }
+
+ public static List<MetricsReporter> metricsReporters(String clientId, AbstractConfig config) {
+ return metricsReporters(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId), config);
+ }
+
+ public static List<MetricsReporter> metricsReporters(Map<String, Object> clientIdOverride, AbstractConfig config) {
+ List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class, clientIdOverride);
+ if (config.getBoolean(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG) &&
+ reporters.stream().noneMatch(r -> JmxReporter.class.equals(r.getClass()))) {
+ JmxReporter jmxReporter = new JmxReporter();
+ jmxReporter.configure(config.originals(clientIdOverride));
+ reporters.add(jmxReporter);
+ }
+ return reporters;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 37af3864103..0afcd709f04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -102,6 +102,10 @@ public class AdminClientConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+ public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC;
+
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
@@ -198,6 +202,11 @@ public class AdminClientConfig extends AbstractConfig {
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
Importance.LOW,
METRICS_RECORDING_LEVEL_DOC)
+ .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ AUTO_INCLUDE_JMX_REPORTER_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e5df779b616..95fb0ed0d70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -153,7 +153,6 @@ import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -354,7 +353,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* The metrics for this KafkaAdminClient.
*/
- private final Metrics metrics;
+ final Metrics metrics;
/**
* The network client to use.
@@ -451,6 +450,10 @@ public class KafkaAdminClient extends AdminClient {
return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
}
+ String getClientId() {
+ return clientId;
+ }
+
/**
* Get the deadline for a particular call.
*
@@ -506,17 +509,12 @@ public class KafkaAdminClient extends AdminClient {
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
- List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
metrics = new Metrics(metricConfig, reporters, time, metricsContext);
@@ -3344,7 +3342,7 @@ public class KafkaAdminClient extends AdminClient {
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> states = options.states()
.stream()
- .map(s -> s.toString())
+ .map(ConsumerGroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5a217705290..f8bc97ec8a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -239,6 +239,12 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ /**
+ * <code>auto.include.jmx.reporter</code>
+ * */
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
/**
* <code>check.crcs</code>
*/
@@ -488,6 +494,11 @@ public class ConsumerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6ffb772915d..f07846945a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -876,11 +875,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
- List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)));
- reporters.add(jmxReporter);
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
return new Metrics(metricConfig, reporters, time, metricsContext);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ec8b8725c88..b0421fbd6e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -55,7 +55,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -365,12 +364,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
- List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
- reporters.add(jmxReporter);
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index aff5e49cfcb..df760a79c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -230,6 +230,10 @@ public class ProducerConfig extends AbstractConfig {
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ /** <code>auto.include.jmx.reporter</code> */
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
@@ -406,6 +410,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
index 8b33868aaf0..c38f3e6a030 100644
--- a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
@@ -21,12 +21,15 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -36,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class CommonClientConfigsTest {
+ @SuppressWarnings("deprecation")
private static class TestConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
@@ -62,7 +66,18 @@ public class CommonClientConfigsTest {
ConfigDef.Type.STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM,
ConfigDef.Importance.MEDIUM,
- SaslConfigs.SASL_MECHANISM_DOC);
+ SaslConfigs.SASL_MECHANISM_DOC)
+ .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+ ConfigDef.Type.LIST,
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
+ ConfigDef.Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ true,
+ ConfigDef.Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC);
}
@Override
@@ -114,4 +129,32 @@ public class CommonClientConfigsTest {
ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
}
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMetricsReporters() {
+ TestConfig config = new TestConfig(Collections.emptyMap());
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters("clientId", config);
+ assertEquals(1, reporters.size());
+ assertTrue(reporters.get(0) instanceof JmxReporter);
+
+ config = new TestConfig(Collections.singletonMap(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"));
+ reporters = CommonClientConfigs.metricsReporters("clientId", config);
+ assertTrue(reporters.isEmpty());
+
+ config = new TestConfig(Collections.singletonMap(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName()));
+ reporters = CommonClientConfigs.metricsReporters("clientId", config);
+ assertEquals(1, reporters.size());
+ assertTrue(reporters.get(0) instanceof JmxReporter);
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName() + "," + MyJmxReporter.class.getName());
+ config = new TestConfig(props);
+ reporters = CommonClientConfigs.metricsReporters("clientId", config);
+ assertEquals(2, reporters.size());
+ }
+
+ public static class MyJmxReporter extends JmxReporter {
+ public MyJmxReporter() {}
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 193457655a5..9ee76c1cc57 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -209,6 +209,7 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -232,6 +233,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -342,6 +344,41 @@ public class KafkaAdminClientTest {
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
}
+ @Test
+ public void testMetricsReporterAutoGeneratedClientId() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) admin.metrics.reporters().get(0);
+
+ assertEquals(admin.getClientId(), mockMetricsReporter.clientId);
+ assertEquals(2, admin.metrics.reporters().size());
+ admin.close();
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDisableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(AdminClientConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+ assertTrue(admin.metrics.reporters().isEmpty());
+ admin.close();
+ }
+
+ @Test
+ public void testExplicitlyEnableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+ assertEquals(1, admin.metrics.reporters().size());
+ admin.close();
+ }
+
private static Cluster mockCluster(int numNodes, int controllerIndex) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; i++)
@@ -820,7 +857,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
1,
- Collections.<MetadataResponse.TopicMetadata>emptyList()));
+ Collections.emptyList()));
env.kafkaClient().prepareResponseFrom(
prepareCreateTopicsResponse("myTopic", Errors.NONE),
env.cluster().nodeById(1));
@@ -1321,7 +1358,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testAdminClientApisAuthenticationFailure() throws Exception {
+ public void testAdminClientApisAuthenticationFailure() {
Cluster cluster = mockBootstrapCluster();
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
@@ -1333,7 +1370,7 @@ public class KafkaAdminClientTest {
}
}
- private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) {
ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics(
singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(10000)).all().get());
@@ -1365,7 +1402,7 @@ public class KafkaAdminClientTest {
"Expected an authentication error, but got " + Utils.stackTrace(e));
}
- private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+ private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) {
ExecutionException e = assertThrows(ExecutionException.class,
() -> env.adminClient().describeClientQuotas(ClientQuotaFilter.all()).entities().get());
assertTrue(e.getCause() instanceof AuthenticationException,
@@ -1615,7 +1652,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDescribeConfigsPartialResponse() throws Exception {
+ public void testDescribeConfigsPartialResponse() {
ConfigResource topic = new ConfigResource(ConfigResource.Type.TOPIC, "topic");
ConfigResource topic2 = new ConfigResource(ConfigResource.Type.TOPIC, "topic2");
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -2242,8 +2279,8 @@ public class KafkaAdminClientTest {
partitionInfos.add(new PartitionInfo("my_topic", 3, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
partitionInfos.add(new PartitionInfo("my_topic", 4, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
- partitionInfos, Collections.<String>emptySet(),
- Collections.<String>emptySet(), nodes.get(0));
+ partitionInfos, Collections.emptySet(),
+ Collections.emptySet(), nodes.get(0));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -4685,8 +4722,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
Arrays.asList(node0),
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -4755,8 +4792,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
nodes,
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -4822,8 +4859,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
nodes,
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5294,8 +5331,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
nodes,
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5469,8 +5506,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
nodes,
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp1 = new TopicPartition("foo", 0);
@@ -5501,8 +5538,8 @@ public class KafkaAdminClientTest {
"mockClusterId",
nodes,
pInfos,
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5956,7 +5993,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testAlterUserScramCredentialsUnknownMechanism() throws Exception {
+ public void testAlterUserScramCredentialsUnknownMechanism() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e7f25345c6d..47370d38270 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -209,6 +209,28 @@ public class KafkaConsumerTest {
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
+ assertEquals(2, consumer.metrics.reporters().size());
+ consumer.close();
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDisableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+ assertTrue(consumer.metrics.reporters().isEmpty());
+ consumer.close();
+ }
+
+ @Test
+ public void testExplicitlyEnableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+ assertEquals(1, consumer.metrics.reporters().size());
consumer.close();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dc7db382a62..a1a854fc671 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -172,7 +172,7 @@ public class KafkaProducerTest {
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
- return new KafkaProducer<K, V>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
+ return new KafkaProducer<>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, metadata, kafkaClient, interceptors, time);
}
@@ -455,6 +455,28 @@ public class KafkaProducerTest {
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
+ assertEquals(2, producer.metrics.reporters().size());
+ producer.close();
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDisableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ assertTrue(producer.metrics.reporters().isEmpty());
+ producer.close();
+ }
+
+ @Test
+ public void testExplicitlyEnableJmxReporter() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ assertEquals(1, producer.metrics.reporters().size());
producer.close();
}
@@ -1901,7 +1923,7 @@ public class KafkaProducerTest {
}
@Test
- public void testCallbackAndInterceptorHandleError() throws Exception {
+ public void testCallbackAndInterceptorHandleError() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d59f4bc7664..e367d3aab72 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.config.ConfigDef.ValidString;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -320,11 +319,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
List<MetricsReporter> metricsReporters() {
- List<MetricsReporter> reporters = getConfiguredInstances(
- CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(this.originals());
- reporters.add(jmxReporter);
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this);
MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");
for (MetricsReporter reporter : reporters) {
@@ -478,6 +473,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
}
+ @SuppressWarnings("deprecation")
protected static final ConfigDef CONNECTOR_CONFIG_DEF = ConnectorConfig.configDef()
.define(
ENABLED,
@@ -720,6 +716,13 @@ public class MirrorConnectorConfig extends AbstractConfig {
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .define(
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ true,
+ ConfigDef.Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
+ )
.withClientSslSupport()
.withClientSaslSupport();
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index ab8e33768c8..3765729c79e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.test.MockMetricsReporter;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -335,4 +337,24 @@ public class MirrorConnectorConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMetricsReporters() {
+ Map<String, String> connectorProps = makeProps("metric.reporters", MockMetricsReporter.class.getName());
+ MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+ assertEquals(2, config.metricsReporters().size());
+
+ connectorProps.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ config = new MirrorConnectorConfig(connectorProps);
+ assertEquals(1, config.metricsReporters().size());
+ }
+
+ @Test
+ public void testExplicitlyEnableJmxReporter() {
+ String reporters = MockMetricsReporter.class.getName() + "," + JmxReporter.class.getName();
+ Map<String, String> connectorProps = makeProps("metric.reporters", reporters);
+ MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+ assertEquals(2, config.metricsReporters().size());
+ }
+
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index ed81be657a0..bbebf813d5a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Gauge;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -76,14 +75,10 @@ public class ConnectMetrics {
int numSamples = config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG);
long sampleWindowMs = config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG);
String metricsRecordingLevel = config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG);
- List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(workerId, config);
MetricConfig metricConfig = new MetricConfig().samples(numSamples)
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index d8cec7173ee..9a1bbf8540f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -203,6 +203,9 @@ public class WorkerConfig extends AbstractConfig {
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
+ "topics per connector during runtime.";
@@ -284,6 +287,11 @@ public class WorkerConfig extends AbstractConfig {
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
"", Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
.define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
ConfigDef.Type.STRING, SslClientAuth.NONE.toString(), in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d7ad3f4eb31..5f5363e64f4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -43,7 +42,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -84,12 +82,7 @@ public class WorkerGroupMember {
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
- List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
+ List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index ecdcf177a74..11de9528404 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -37,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
public class ConnectMetricsTest {
@@ -145,6 +147,30 @@ public class ConnectMetricsTest {
assertSame(originalSensor, recreatedSensor);
}
+ @Test
+ public void testMetricReporters() {
+ assertEquals(1, metrics.metrics().reporters().size());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDisableJmxReporter() {
+ Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
+ props.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+ ConnectMetrics cm = new ConnectMetrics("worker-testDisableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
+ assertTrue(cm.metrics().reporters().isEmpty());
+ cm.stop();
+ }
+
+ @Test
+ public void testExplicitlyEnableJmxReporter() {
+ Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
+ props.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+ ConnectMetrics cm = new ConnectMetrics("worker-testExplicitlyEnableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
+ assertEquals(1, cm.metrics().reporters().size());
+ cm.stop();
+ }
+
private Sensor addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
ConnectMetricsRegistry registry = connectMetrics.registry();
ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
index fa910bb54c6..a10643a5aa0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.utils.LogContext;
@@ -70,16 +71,22 @@ public class WorkerGroupMemberTest {
utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
}
- boolean entered = false;
+ boolean foundMockReporter = false;
+ boolean foundJmxReporter = false;
+ assertEquals(2, member.metrics().reporters().size());
for (MetricsReporter reporter : member.metrics().reporters()) {
if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
- entered = true;
+ foundMockReporter = true;
MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
assertEquals("group-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_GROUP_ID));
}
+ if (reporter instanceof JmxReporter) {
+ foundJmxReporter = true;
+ }
}
- assertTrue("Failed to verify MetricsReporter", entered);
+ assertTrue("Failed to find MockMetricsReporter", foundMockReporter);
+ assertTrue("Failed to find JmxReporter", foundJmxReporter);
MetricName name = member.metrics().metricName("test.avg", "grp1");
member.metrics().addMetric(name, new Avg());
@@ -87,4 +94,29 @@ public class WorkerGroupMemberTest {
//verify metric exists with correct prefix
assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1")));
}
+
+ @Test
+ public void testDisableJmxReporter() {
+ WorkerGroupMember member;
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("group.id", "group-1");
+ workerProps.put("offset.storage.topic", "topic-1");
+ workerProps.put("config.storage.topic", "topic-1");
+ workerProps.put("status.storage.topic", "topic-1");
+ workerProps.put("auto.include.jmx.reporter", "false");
+ DistributedConfig config = new DistributedConfig(workerProps);
+
+ LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+ try (MockedStatic<WorkerConfig> utilities = mockStatic(WorkerConfig.class)) {
+ utilities.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1");
+ member = new WorkerGroupMember(config, "", configBackingStore, null, Time.SYSTEM, "client-1", logContext);
+ utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
+ }
+
+ assertTrue(member.metrics().reporters().isEmpty());
+ member.stop();
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 99f8234059e..bd89780a4c7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
@@ -750,7 +750,7 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
- private val currentReporters = reporterState.currentReporters
+ private[server] val currentReporters = reporterState.currentReporters
private val dynamicConfig = reporterState.dynamicConfig
private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
@@ -824,6 +824,7 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
propsOverride.forKeyValue((k, v) => props.put(k, v))
val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
+
// Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
// which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
// The first call to MetricsReporter.init is done when we call metrics.addReporter below.
@@ -839,8 +840,15 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
currentReporters.remove(className).foreach(metrics.removeReporter)
}
+ @nowarn("cat=deprecation")
private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
- configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+ val reporters = mutable.Buffer[String]()
+ reporters ++= configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+ if (configs.get(KafkaConfig.AutoIncludeJmxReporterProp).asInstanceOf[Boolean] &&
+ !reporters.contains(classOf[JmxReporter].getName)) {
+ reporters += classOf[JmxReporter].getName
+ }
+ reporters
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0c222f92e9e..e03192b5b09 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -241,6 +241,7 @@ object Defaults {
val MetricSampleWindowMs = 30000
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
+ val AutoIncludeJmxReporter = true
/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
@@ -563,6 +564,9 @@ object KafkaConfig {
val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
+ @deprecated
+ @nowarn("cat=deprecation")
+ val AutoIncludeJmxReporterProp: String = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG
/** ********* Kafka Yammer Metrics Reporters Configuration ***********/
val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
@@ -1004,6 +1008,7 @@ object KafkaConfig {
val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
+ val AutoIncludeJmxReporterDoc = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
/** ********* Kafka Yammer Metrics Reporter Configuration ***********/
@@ -1311,6 +1316,7 @@ object KafkaConfig {
.define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
.define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
.define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
+ .define(AutoIncludeJmxReporterProp, BOOLEAN, Defaults.AutoIncludeJmxReporter, LOW, AutoIncludeJmxReporterDoc)
/** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
.define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index 5d902c58312..d85060cc72d 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,12 +16,13 @@
*/
package kafka.server
-import java.util.concurrent.TimeUnit
-
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
import org.apache.kafka.common.utils.Time
+import java.util
+import java.util.concurrent.TimeUnit
+
trait Server {
def startup(): Unit
@@ -49,9 +50,8 @@ object Server {
time: Time,
metricsContext: KafkaMetricsContext
): Metrics = {
- val defaultReporters = initializeDefaultReporters(config)
val metricConfig = buildMetricsConfig(config)
- new Metrics(metricConfig, defaultReporters, time, true, metricsContext)
+ new Metrics(metricConfig, new util.ArrayList[MetricsReporter](), time, true, metricsContext)
}
def buildMetricsConfig(
@@ -80,17 +80,6 @@ object Server {
new KafkaMetricsContext(MetricsPrefix, contextLabels)
}
- private def initializeDefaultReporters(
- config: KafkaConfig
- ): java.util.List[MetricsReporter] = {
- val jmxReporter = new JmxReporter()
- jmxReporter.configure(config.originals)
-
- val reporters = new java.util.ArrayList[MetricsReporter]
- reporters.add(jmxReporter)
- reporters
- }
-
sealed trait ProcessStatus
case object SHUTDOWN extends ProcessStatus
case object STARTING extends ProcessStatus
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 5fc84def2c1..578e8afb402 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.{Map => JMap, Properties}
import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicReference
-
import kafka.controller.KafkaController
import kafka.log.{LogConfig, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -31,8 +30,10 @@ import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyString
@@ -531,6 +532,60 @@ class DynamicBrokerConfigTest {
// Even if One property is invalid, the below should get correctly updated.
assertEquals(1111, config.messageMaxBytes)
}
+
+ @Test
+ def testUpdateMetricReporters(): Unit = {
+ val brokerId = 0
+ val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
+
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[KafkaBroker])
+ val metrics = Mockito.mock(classOf[Metrics])
+
+ Mockito.when(serverMock.config).thenReturn(config)
+
+ config.dynamicConfig.initialize(None)
+ val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
+ config.dynamicConfig.addReconfigurable(m)
+ assertEquals(1, m.currentReporters.size)
+ assertEquals(classOf[JmxReporter].getName, m.currentReporters.keySet.head)
+
+ val props = new Properties()
+ props.put(KafkaConfig.MetricReporterClassesProp, classOf[MockMetricsReporter].getName)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(2, m.currentReporters.size)
+ assertEquals(Set(classOf[JmxReporter].getName, classOf[MockMetricsReporter].getName), m.currentReporters.keySet)
+ }
+
+ @Test
+ @nowarn("cat=deprecation")
+ def testUpdateMetricReportersNoJmxReporter(): Unit = {
+ val brokerId = 0
+ val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(KafkaConfig.AutoIncludeJmxReporterProp, "false")
+
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[KafkaBroker])
+ val metrics = Mockito.mock(classOf[Metrics])
+
+ Mockito.when(serverMock.config).thenReturn(config)
+
+ config.dynamicConfig.initialize(None)
+ val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
+ config.dynamicConfig.addReconfigurable(m)
+ assertTrue(m.currentReporters.isEmpty)
+
+ val props = new Properties()
+ props.put(KafkaConfig.MetricReporterClassesProp, classOf[MockMetricsReporter].getName)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(1, m.currentReporters.size)
+ assertEquals(classOf[MockMetricsReporter].getName, m.currentReporters.keySet.head)
+
+ props.remove(KafkaConfig.MetricReporterClassesProp)
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertTrue(m.currentReporters.isEmpty)
+ }
+
}
class TestDynamicThreadPool() extends BrokerReconfigurable {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 02b576c186b..ac24e709fbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -104,7 +103,6 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -992,17 +990,13 @@ public class KafkaStreams implements AutoCloseable {
return streamThread;
}
- private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) {
+ static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) {
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
- final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
- final JmxReporter jmxReporter = new JmxReporter();
- jmxReporter.configure(config.originals());
- reporters.add(jmxReporter);
+ final List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
+
final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
return new Metrics(metricConfig, reporters, time, metricsContext);
@@ -1253,7 +1247,7 @@ public class KafkaStreams implements AutoCloseable {
private static ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(final String clientId,
final StreamsConfig config) {
- if (RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+ if (RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
return Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId + "-RocksDBMetricsRecordingTrigger");
thread.setDaemon(true);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8976a0983fa..325cbec90d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -554,6 +554,10 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+ /** {@code auto.include.jmx.reporter} */
+ @Deprecated
+ public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
/** {@code num.standby.replicas} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
@@ -917,6 +921,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100L,