You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/24 16:30:50 UTC

[kafka] branch trunk updated: KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0507597597e KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)
0507597597e is described below

commit 0507597597e1d2d9adaa33b87e5ea509e12fd2f0
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Aug 24 18:30:31 2022 +0200

    KAFKA-10360: Allow disabling JMX Reporter (KIP-830) (#12046)
    
    This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
    It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Christo Lolov <ch...@yahoo.com>
---
 .../apache/kafka/clients/CommonClientConfigs.java  | 28 ++++++++
 .../kafka/clients/admin/AdminClientConfig.java     |  9 +++
 .../kafka/clients/admin/KafkaAdminClient.java      | 16 ++---
 .../kafka/clients/consumer/ConsumerConfig.java     | 11 ++++
 .../kafka/clients/consumer/KafkaConsumer.java      |  7 +-
 .../kafka/clients/producer/KafkaProducer.java      |  8 +--
 .../kafka/clients/producer/ProducerConfig.java     |  9 +++
 .../kafka/clients/CommonClientConfigsTest.java     | 45 ++++++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 77 ++++++++++++++++------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 22 +++++++
 .../kafka/clients/producer/KafkaProducerTest.java  | 26 +++++++-
 .../connect/mirror/MirrorConnectorConfig.java      | 15 +++--
 .../connect/mirror/MirrorConnectorConfigTest.java  | 22 +++++++
 .../kafka/connect/runtime/ConnectMetrics.java      |  7 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java |  8 +++
 .../runtime/distributed/WorkerGroupMember.java     |  9 +--
 .../kafka/connect/runtime/ConnectMetricsTest.java  | 26 ++++++++
 .../runtime/distributed/WorkerGroupMemberTest.java | 38 ++++++++++-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 14 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  6 ++
 core/src/main/scala/kafka/server/Server.scala      | 21 ++----
 .../kafka/server/DynamicBrokerConfigTest.scala     | 57 +++++++++++++++-
 .../org/apache/kafka/streams/KafkaStreams.java     | 14 ++--
 .../org/apache/kafka/streams/StreamsConfig.java    |  9 +++
 24 files changed, 406 insertions(+), 98 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index b142867abc9..d88aa0a6a1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -19,12 +19,16 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -98,6 +102,10 @@ public class CommonClientConfigs {
 
     public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
 
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = "auto.include.jmx.reporter";
+    public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.";
+
     public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
     public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
         Utils.join(SecurityProtocol.names(), ", ") + ".";
@@ -216,4 +224,24 @@ public class CommonClientConfigs {
             }
         }
     }
+
+    public static List<MetricsReporter> metricsReporters(AbstractConfig config) {
+        return metricsReporters(Collections.emptyMap(), config);
+    }
+
+    public static List<MetricsReporter> metricsReporters(String clientId, AbstractConfig config) {
+        return metricsReporters(Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId), config);
+    }
+
+    public static List<MetricsReporter> metricsReporters(Map<String, Object> clientIdOverride, AbstractConfig config) {
+        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class, clientIdOverride);
+        if (config.getBoolean(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG) &&
+                reporters.stream().noneMatch(r -> JmxReporter.class.equals(r.getClass()))) {
+            JmxReporter jmxReporter = new JmxReporter();
+            jmxReporter.configure(config.originals(clientIdOverride));
+            reporters.add(jmxReporter);
+        }
+        return reporters;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 37af3864103..0afcd709f04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -102,6 +102,10 @@ public class AdminClientConfig extends AbstractConfig {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
     private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
 
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+    public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC;
+
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
     private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
 
@@ -198,6 +202,11 @@ public class AdminClientConfig extends AbstractConfig {
                                         in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
                                         Importance.LOW,
                                         METRICS_RECORDING_LEVEL_DOC)
+                                .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        AUTO_INCLUDE_JMX_REPORTER_DOC)
                                 .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
                                         ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e5df779b616..95fb0ed0d70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -153,7 +153,6 @@ import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
 import org.apache.kafka.common.message.UnregisterBrokerRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -354,7 +353,7 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * The metrics for this KafkaAdminClient.
      */
-    private final Metrics metrics;
+    final Metrics metrics;
 
     /**
      * The network client to use.
@@ -451,6 +450,10 @@ public class KafkaAdminClient extends AdminClient {
         return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
     }
 
+    String getClientId() {
+        return clientId;
+    }
+
     /**
      * Get the deadline for a particular call.
      *
@@ -506,17 +509,12 @@ public class KafkaAdminClient extends AdminClient {
                     config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
             metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
-            List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class,
-                Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
+            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                 .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                 .tags(metricTags);
-            JmxReporter jmxReporter = new JmxReporter();
-            jmxReporter.configure(config.originals());
-            reporters.add(jmxReporter);
             MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                     config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             metrics = new Metrics(metricConfig, reporters, time, metricsContext);
@@ -3344,7 +3342,7 @@ public class KafkaAdminClient extends AdminClient {
                         ListGroupsRequest.Builder createRequest(int timeoutMs) {
                             List<String> states = options.states()
                                     .stream()
-                                    .map(s -> s.toString())
+                                    .map(ConsumerGroupState::toString)
                                     .collect(Collectors.toList());
                             return new ListGroupsRequest.Builder(new ListGroupsRequestData().setStatesFilter(states));
                         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5a217705290..f8bc97ec8a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -239,6 +239,12 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
+    /**
+     * <code>auto.include.jmx.reporter</code>
+     * */
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
     /**
      * <code>check.crcs</code>
      */
@@ -488,6 +494,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
                                 .define(KEY_DESERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
                                         Importance.HIGH,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6ffb772915d..f07846945a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -876,11 +875,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                 .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                 .tags(metricsTags);
-        List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
-        JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)));
-        reporters.add(jmxReporter);
+        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
         MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                 config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
         return new Metrics(metricConfig, reporters, time, metricsContext);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ec8b8725c88..b0421fbd6e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -55,7 +55,6 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -365,12 +364,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricTags);
-            List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class,
-                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
-            JmxReporter jmxReporter = new JmxReporter();
-            jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
-            reporters.add(jmxReporter);
+            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
             MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                     config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index aff5e49cfcb..df760a79c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -230,6 +230,10 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>metric.reporters</code> */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
+    /** <code>auto.include.jmx.reporter</code> */
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
     // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
     private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
 
@@ -406,6 +410,11 @@ public class ProducerConfig extends AbstractConfig {
                                         new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
                                 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                                         Type.INT,
                                         5,
diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
index 8b33868aaf0..c38f3e6a030 100644
--- a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
@@ -21,12 +21,15 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -36,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CommonClientConfigsTest {
+    @SuppressWarnings("deprecation")
     private static class TestConfig extends AbstractConfig {
         private static final ConfigDef CONFIG;
         static {
@@ -62,7 +66,18 @@ public class CommonClientConfigsTest {
                     ConfigDef.Type.STRING,
                     SaslConfigs.DEFAULT_SASL_MECHANISM,
                     ConfigDef.Importance.MEDIUM,
-                    SaslConfigs.SASL_MECHANISM_DOC);
+                    SaslConfigs.SASL_MECHANISM_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                    ConfigDef.Type.LIST,
+                    Collections.emptyList(),
+                    new ConfigDef.NonNullValidator(),
+                    ConfigDef.Importance.LOW,
+                    CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                    ConfigDef.Type.BOOLEAN,
+                    true,
+                    ConfigDef.Importance.LOW,
+                    CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC);
         }
 
         @Override
@@ -114,4 +129,32 @@ public class CommonClientConfigsTest {
         ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
         assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
     }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testMetricsReporters() {
+        TestConfig config = new TestConfig(Collections.emptyMap());
+        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters("clientId", config);
+        assertEquals(1, reporters.size());
+        assertTrue(reporters.get(0) instanceof JmxReporter);
+
+        config = new TestConfig(Collections.singletonMap(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"));
+        reporters = CommonClientConfigs.metricsReporters("clientId", config);
+        assertTrue(reporters.isEmpty());
+
+        config = new TestConfig(Collections.singletonMap(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName()));
+        reporters = CommonClientConfigs.metricsReporters("clientId", config);
+        assertEquals(1, reporters.size());
+        assertTrue(reporters.get(0) instanceof JmxReporter);
+
+        Map<String, String> props = new HashMap<>();
+        props.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName() + "," + MyJmxReporter.class.getName());
+        config = new TestConfig(props);
+        reporters = CommonClientConfigs.metricsReporters("clientId", config);
+        assertEquals(2, reporters.size());
+    }
+
+    public static class MyJmxReporter extends JmxReporter {
+        public MyJmxReporter() {}
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 193457655a5..9ee76c1cc57 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -209,6 +209,7 @@ import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -232,6 +233,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -342,6 +344,41 @@ public class KafkaAdminClientTest {
                 KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
     }
 
+    @Test
+    public void testMetricsReporterAutoGeneratedClientId() {
+        Properties props = new Properties();
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) admin.metrics.reporters().get(0);
+
+        assertEquals(admin.getClientId(), mockMetricsReporter.clientId);
+        assertEquals(2, admin.metrics.reporters().size());
+        admin.close();
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testDisableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(AdminClientConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+        KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+        assertTrue(admin.metrics.reporters().isEmpty());
+        admin.close();
+    }
+
+    @Test
+    public void testExplicitlyEnableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+        KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
+        assertEquals(1, admin.metrics.reporters().size());
+        admin.close();
+    }
+
     private static Cluster mockCluster(int numNodes, int controllerIndex) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; i++)
@@ -820,7 +857,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(env.cluster().nodes(),
                 env.cluster().clusterResource().clusterId(),
                 1,
-                Collections.<MetadataResponse.TopicMetadata>emptyList()));
+                Collections.emptyList()));
             env.kafkaClient().prepareResponseFrom(
                 prepareCreateTopicsResponse("myTopic", Errors.NONE),
                 env.cluster().nodeById(1));
@@ -1321,7 +1358,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testAdminClientApisAuthenticationFailure() throws Exception {
+    public void testAdminClientApisAuthenticationFailure() {
         Cluster cluster = mockBootstrapCluster();
         try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
                 newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
@@ -1333,7 +1370,7 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+    private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) {
         ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics(
             singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
             new CreateTopicsOptions().timeoutMs(10000)).all().get());
@@ -1365,7 +1402,7 @@ public class KafkaAdminClientTest {
             "Expected an authentication error, but got " + Utils.stackTrace(e));
     }
 
-    private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+    private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) {
         ExecutionException e = assertThrows(ExecutionException.class,
             () -> env.adminClient().describeClientQuotas(ClientQuotaFilter.all()).entities().get());
         assertTrue(e.getCause() instanceof AuthenticationException,
@@ -1615,7 +1652,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDescribeConfigsPartialResponse() throws Exception {
+    public void testDescribeConfigsPartialResponse() {
         ConfigResource topic = new ConfigResource(ConfigResource.Type.TOPIC, "topic");
         ConfigResource topic2 = new ConfigResource(ConfigResource.Type.TOPIC, "topic2");
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -2242,8 +2279,8 @@ public class KafkaAdminClientTest {
         partitionInfos.add(new PartitionInfo("my_topic", 3, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
         partitionInfos.add(new PartitionInfo("my_topic", 4, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
-                partitionInfos, Collections.<String>emptySet(),
-                Collections.<String>emptySet(), nodes.get(0));
+                partitionInfos, Collections.emptySet(),
+                Collections.emptySet(), nodes.get(0));
 
         TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
         TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -4685,8 +4722,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 Arrays.asList(node0),
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -4755,8 +4792,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 nodes,
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -4822,8 +4859,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 nodes,
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5294,8 +5331,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 nodes,
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5469,8 +5506,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 nodes,
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp1 = new TopicPartition("foo", 0);
@@ -5501,8 +5538,8 @@ public class KafkaAdminClientTest {
                 "mockClusterId",
                 nodes,
                 pInfos,
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 node0);
 
         final TopicPartition tp0 = new TopicPartition("foo", 0);
@@ -5956,7 +5993,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testAlterUserScramCredentialsUnknownMechanism() throws Exception {
+    public void testAlterUserScramCredentialsUnknownMechanism() {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e7f25345c6d..47370d38270 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -209,6 +209,28 @@ public class KafkaConsumerTest {
         MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
 
         assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
+        assertEquals(2, consumer.metrics.reporters().size());
+        consumer.close();
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testDisableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+        assertTrue(consumer.metrics.reporters().isEmpty());
+        consumer.close();
+    }
+
+    @Test
+    public void testExplicitlyEnableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+        assertEquals(1, consumer.metrics.reporters().size());
         consumer.close();
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dc7db382a62..a1a854fc671 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -172,7 +172,7 @@ public class KafkaProducerTest {
                   KafkaClient kafkaClient,
                   ProducerInterceptors<K, V> interceptors,
                   Time time) {
-        return new KafkaProducer<K, V>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
+        return new KafkaProducer<>(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
             keySerializer, valueSerializer, metadata, kafkaClient, interceptors, time);
     }
 
@@ -455,6 +455,28 @@ public class KafkaProducerTest {
         MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
 
         assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
+        assertEquals(2, producer.metrics.reporters().size());
+        producer.close();
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testDisableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+        assertTrue(producer.metrics.reporters().isEmpty());
+        producer.close();
+    }
+
+    @Test
+    public void testExplicitlyEnableJmxReporter() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+        assertEquals(1, producer.metrics.reporters().size());
         producer.close();
     }
 
@@ -1901,7 +1923,7 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testCallbackAndInterceptorHandleError() throws Exception {
+    public void testCallbackAndInterceptorHandleError() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index d59f4bc7664..e367d3aab72 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.config.ConfigDef.ValidString;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -320,11 +319,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
     }
 
     List<MetricsReporter> metricsReporters() {
-        List<MetricsReporter> reporters = getConfiguredInstances(
-                CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(this.originals());
-        reporters.add(jmxReporter);
+        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this);
         MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");
 
         for (MetricsReporter reporter : reporters) {
@@ -478,6 +473,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
         }
     }
 
+    @SuppressWarnings("deprecation")
     protected static final ConfigDef CONNECTOR_CONFIG_DEF = ConnectorConfig.configDef()
             .define(
                     ENABLED,
@@ -720,6 +716,13 @@ public class MirrorConnectorConfig extends AbstractConfig {
                     in(Utils.enumOptions(SecurityProtocol.class)),
                     ConfigDef.Importance.MEDIUM,
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+            .define(
+                    CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                    ConfigDef.Type.BOOLEAN,
+                    true,
+                    ConfigDef.Importance.LOW,
+                    CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
+            )
             .withClientSslSupport()
             .withClientSaslSupport();
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index ab8e33768c8..3765729c79e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -335,4 +337,24 @@ public class MirrorConnectorConfigTest {
         assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testMetricsReporters() {
+        Map<String, String> connectorProps = makeProps("metric.reporters", MockMetricsReporter.class.getName());
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(2, config.metricsReporters().size());
+
+        connectorProps.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(1, config.metricsReporters().size());
+    }
+
+    @Test
+    public void testExplicitlyEnableJmxReporter() {
+        String reporters = MockMetricsReporter.class.getName() + "," + JmxReporter.class.getName();
+        Map<String, String> connectorProps = makeProps("metric.reporters", reporters);
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        assertEquals(2, config.metricsReporters().size());
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index ed81be657a0..bbebf813d5a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.metrics.Gauge;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -76,14 +75,10 @@ public class ConnectMetrics {
         int numSamples = config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG);
         long sampleWindowMs = config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG);
         String metricsRecordingLevel = config.getString(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG);
-        List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-
+        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(workerId, config);
         MetricConfig metricConfig = new MetricConfig().samples(numSamples)
                 .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
                         Sensor.RecordingLevel.forName(metricsRecordingLevel));
-        JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(config.originals());
-        reporters.add(jmxReporter);
 
         Map<String, Object> contextLabels = new HashMap<>();
         contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index d8cec7173ee..9a1bbf8540f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -203,6 +203,9 @@ public class WorkerConfig extends AbstractConfig {
     public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
     public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
     protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
             + "topics per connector during runtime.";
@@ -284,6 +287,11 @@ public class WorkerConfig extends AbstractConfig {
                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
                         "", Importance.LOW,
                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                        Type.BOOLEAN,
+                        true,
+                        Importance.LOW,
+                        CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
                 .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
                         ConfigDef.Type.STRING, SslClientAuth.NONE.toString(), in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index d7ad3f4eb31..5f5363e64f4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -43,7 +42,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -84,12 +82,7 @@ public class WorkerGroupMember {
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                     .tags(metricsTags);
-            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class,
-                    Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
-            JmxReporter jmxReporter = new JmxReporter();
-            jmxReporter.configure(config.originals());
-            reporters.add(jmxReporter);
+            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
 
             Map<String, Object> contextLabels = new HashMap<>();
             contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index ecdcf177a74..11de9528404 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -37,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 public class ConnectMetricsTest {
 
@@ -145,6 +147,30 @@ public class ConnectMetricsTest {
         assertSame(originalSensor, recreatedSensor);
     }
 
+    @Test
+    public void testMetricReporters() {
+        assertEquals(1, metrics.metrics().reporters().size());
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testDisableJmxReporter() {
+        Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
+        props.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
+        ConnectMetrics cm = new ConnectMetrics("worker-testDisableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
+        assertTrue(cm.metrics().reporters().isEmpty());
+        cm.stop();
+    }
+
+    @Test
+    public void testExplicitlyEnableJmxReporter() {
+        Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
+        props.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
+        ConnectMetrics cm = new ConnectMetrics("worker-testExplicitlyEnableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
+        assertEquals(1, cm.metrics().reporters().size());
+        cm.stop();
+    }
+
     private Sensor addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
         ConnectMetricsRegistry registry = connectMetrics.registry();
         ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
index fa910bb54c6..a10643a5aa0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.utils.LogContext;
@@ -70,16 +71,22 @@ public class WorkerGroupMemberTest {
             utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
         }
 
-        boolean entered = false;
+        boolean foundMockReporter = false;
+        boolean foundJmxReporter = false;
+        assertEquals(2, member.metrics().reporters().size());
         for (MetricsReporter reporter : member.metrics().reporters()) {
             if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
-                entered = true;
+                foundMockReporter = true;
                 MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
                 assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
                 assertEquals("group-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_GROUP_ID));
             }
+            if (reporter instanceof JmxReporter) {
+                foundJmxReporter = true;
+            }
         }
-        assertTrue("Failed to verify MetricsReporter", entered);
+        assertTrue("Failed to find MockMetricsReporter", foundMockReporter);
+        assertTrue("Failed to find JmxReporter", foundJmxReporter);
 
         MetricName name = member.metrics().metricName("test.avg", "grp1");
         member.metrics().addMetric(name, new Avg());
@@ -87,4 +94,29 @@ public class WorkerGroupMemberTest {
         //verify metric exists with correct prefix
         assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1")));
     }
+
+    @Test
+    public void testDisableJmxReporter() {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put("auto.include.jmx.reporter", "false");
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+        try (MockedStatic<WorkerConfig> utilities = mockStatic(WorkerConfig.class)) {
+            utilities.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1");
+            member = new WorkerGroupMember(config, "", configBackingStore, null, Time.SYSTEM, "client-1", logContext);
+            utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
+        }
+
+        assertTrue(member.metrics().reporters().isEmpty());
+        member.stop();
+    }
+
 }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 99f8234059e..bd89780a4c7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -750,7 +750,7 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
 
 class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
   private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
-  private val currentReporters = reporterState.currentReporters
+  private[server] val currentReporters = reporterState.currentReporters
   private val dynamicConfig = reporterState.dynamicConfig
 
   private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
@@ -824,6 +824,7 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
     propsOverride.forKeyValue((k, v) => props.put(k, v))
     val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
+
     // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
     // The first call to MetricsReporter.init is done when we call metrics.addReporter below.
@@ -839,8 +840,15 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
+  @nowarn("cat=deprecation")
   private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
-    configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+    val reporters = mutable.Buffer[String]()
+    reporters ++= configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+    if (configs.get(KafkaConfig.AutoIncludeJmxReporterProp).asInstanceOf[Boolean] &&
+        !reporters.contains(classOf[JmxReporter].getName)) {
+      reporters += classOf[JmxReporter].getName
+    }
+    reporters
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0c222f92e9e..e03192b5b09 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -241,6 +241,7 @@ object Defaults {
   val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
   val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
+  val AutoIncludeJmxReporter = true
 
 
   /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
@@ -563,6 +564,9 @@ object KafkaConfig {
   val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
   val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
+  @deprecated
+  @nowarn("cat=deprecation")
+  val AutoIncludeJmxReporterProp: String = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG
 
   /** ********* Kafka Yammer Metrics Reporters Configuration ***********/
   val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
@@ -1004,6 +1008,7 @@ object KafkaConfig {
   val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
   val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
+  val AutoIncludeJmxReporterDoc = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
 
 
   /** ********* Kafka Yammer Metrics Reporter Configuration ***********/
@@ -1311,6 +1316,7 @@ object KafkaConfig {
       .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc)
       .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc)
       .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc)
+      .define(AutoIncludeJmxReporterProp, BOOLEAN, Defaults.AutoIncludeJmxReporter, LOW, AutoIncludeJmxReporterDoc)
 
       /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
       .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index 5d902c58312..d85060cc72d 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,12 +16,13 @@
  */
 package kafka.server
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
+import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
 import org.apache.kafka.common.utils.Time
 
+import java.util
+import java.util.concurrent.TimeUnit
+
 
 trait Server {
   def startup(): Unit
@@ -49,9 +50,8 @@ object Server {
     time: Time,
     metricsContext: KafkaMetricsContext
   ): Metrics = {
-    val defaultReporters = initializeDefaultReporters(config)
     val metricConfig = buildMetricsConfig(config)
-    new Metrics(metricConfig, defaultReporters, time, true, metricsContext)
+    new Metrics(metricConfig, new util.ArrayList[MetricsReporter](), time, true, metricsContext)
   }
 
   def buildMetricsConfig(
@@ -80,17 +80,6 @@ object Server {
     new KafkaMetricsContext(MetricsPrefix, contextLabels)
   }
 
-  private def initializeDefaultReporters(
-    config: KafkaConfig
-  ): java.util.List[MetricsReporter] = {
-    val jmxReporter = new JmxReporter()
-    jmxReporter.configure(config.originals)
-
-    val reporters = new java.util.ArrayList[MetricsReporter]
-    reporters.add(jmxReporter)
-    reporters
-  }
-
   sealed trait ProcessStatus
   case object SHUTDOWN extends ProcessStatus
   case object STARTING extends ProcessStatus
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 5fc84def2c1..578e8afb402 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -21,7 +21,6 @@ import java.{lang, util}
 import java.util.{Map => JMap, Properties}
 import java.util.concurrent.CompletionStage
 import java.util.concurrent.atomic.AtomicReference
-
 import kafka.controller.KafkaController
 import kafka.log.{LogConfig, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -31,8 +30,10 @@ import org.apache.kafka.common.{Endpoint, Reconfigurable}
 import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.server.authorizer._
+import org.apache.kafka.test.MockMetricsReporter
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.ArgumentMatchers.anyString
@@ -531,6 +532,60 @@ class DynamicBrokerConfigTest {
     // Even if One property is invalid, the below should get correctly updated.
     assertEquals(1111, config.messageMaxBytes)
   }
+
+  @Test
+  def testUpdateMetricReporters(): Unit = {
+    val brokerId = 0
+    val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
+
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[KafkaBroker])
+    val metrics = Mockito.mock(classOf[Metrics])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+
+    config.dynamicConfig.initialize(None)
+    val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
+    config.dynamicConfig.addReconfigurable(m)
+    assertEquals(1, m.currentReporters.size)
+    assertEquals(classOf[JmxReporter].getName, m.currentReporters.keySet.head)
+
+    val props = new Properties()
+    props.put(KafkaConfig.MetricReporterClassesProp, classOf[MockMetricsReporter].getName)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(2, m.currentReporters.size)
+    assertEquals(Set(classOf[JmxReporter].getName, classOf[MockMetricsReporter].getName), m.currentReporters.keySet)
+  }
+
+  @Test
+  @nowarn("cat=deprecation")
+  def testUpdateMetricReportersNoJmxReporter(): Unit = {
+    val brokerId = 0
+    val origProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(KafkaConfig.AutoIncludeJmxReporterProp, "false")
+
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[KafkaBroker])
+    val metrics = Mockito.mock(classOf[Metrics])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+
+    config.dynamicConfig.initialize(None)
+    val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
+    config.dynamicConfig.addReconfigurable(m)
+    assertTrue(m.currentReporters.isEmpty)
+
+    val props = new Properties()
+    props.put(KafkaConfig.MetricReporterClassesProp, classOf[MockMetricsReporter].getName)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(1, m.currentReporters.size)
+    assertEquals(classOf[MockMetricsReporter].getName, m.currentReporters.keySet.head)
+
+    props.remove(KafkaConfig.MetricReporterClassesProp)
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertTrue(m.currentReporters.isEmpty)
+  }
+
 }
 
 class TestDynamicThreadPool() extends BrokerReconfigurable {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 02b576c186b..ac24e709fbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -104,7 +103,6 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -992,17 +990,13 @@ public class KafkaStreams implements AutoCloseable {
         return streamThread;
     }
 
-    private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) {
+    static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) {
         final MetricConfig metricConfig = new MetricConfig()
             .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
-        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                                                                              MetricsReporter.class,
-                                                                              Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
-        final JmxReporter jmxReporter = new JmxReporter();
-        jmxReporter.configure(config.originals());
-        reporters.add(jmxReporter);
+        final List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
+
         final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                                                                       config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
         return new Metrics(metricConfig, reporters, time, metricsContext);
@@ -1253,7 +1247,7 @@ public class KafkaStreams implements AutoCloseable {
 
     private static ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(final String clientId,
                                                                                       final StreamsConfig config) {
-        if (RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+        if (RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
             return Executors.newSingleThreadScheduledExecutor(r -> {
                 final Thread thread = new Thread(r, clientId + "-RocksDBMetricsRecordingTrigger");
                 thread.setDaemon(true);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8976a0983fa..325cbec90d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -554,6 +554,10 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
+    /** {@code auto.include.jmx.reporter} */
+    @Deprecated
+    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
+
     /** {@code num.standby.replicas} */
     @SuppressWarnings("WeakerAccess")
     public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
@@ -917,6 +921,11 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.LOW,
                     CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+            .define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
+                    Type.BOOLEAN,
+                    true,
+                    Importance.LOW,
+                    CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
             .define(POLL_MS_CONFIG,
                     Type.LONG,
                     100L,