You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/10 02:56:20 UTC

[kafka] branch trunk updated: KAFKA-13689: Revert AbstractConfig code changes (#11863)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84b41b9  KAFKA-13689: Revert AbstractConfig code changes (#11863)
84b41b9 is described below

commit 84b41b9d3ad4b45a9137197a304996759e993cde
Author: RivenSun <91...@users.noreply.github.com>
AuthorDate: Thu Mar 10 10:54:10 2022 +0800

    KAFKA-13689: Revert AbstractConfig code changes (#11863)
    
    Reviewer: Luke Chen <sh...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../apache/kafka/common/config/AbstractConfig.java | 25 +---------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 36 --------------
 .../kafka/clients/producer/KafkaProducerTest.java  | 41 ---------------
 .../kafka/common/config/AbstractConfigTest.java    | 58 ++++------------------
 7 files changed, 15 insertions(+), 151 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b58b411..03322fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -592,7 +592,7 @@ public class KafkaAdminClient extends AdminClient {
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
-        config.logUnusedAndUnknown();
+        config.logUnused();
         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
         log.debug("Kafka admin client initialized");
         thread.start();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3e7846f..0fd4ea9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -812,7 +812,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
             this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);
 
-            config.logUnusedAndUnknown();
+            config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 610bfc8..f739336 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
-            config.logUnusedAndUnknown();
+            config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
             log.debug("Kafka producer started");
         } catch (Throwable t) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 2b915d5..7ef4609 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -223,14 +223,6 @@ public class AbstractConfig {
     public Set<String> unused() {
         Set<String> keys = new HashSet<>(originals.keySet());
         keys.removeAll(used);
-        keys.removeAll(unknown());
-        return keys;
-    }
-
-    public Set<String> unknown() {
-        Set<String> keys = new HashSet<>(originals.keySet());
-        keys.removeAll(values.keySet());
-        keys.removeAll(used);
         return keys;
     }
 
@@ -384,24 +376,11 @@ public class AbstractConfig {
         log.info(b.toString());
     }
 
-    public void logUnusedAndUnknown() {
-        logUnused();
-        logUnknown();
-    }
-
     /**
-     * Log infos for any unused configurations to user, exclude unknown configurations
+     * Log warnings for any unused configurations
      */
-    private void logUnused() {
+    public void logUnused() {
         for (String key : unused())
-            log.info("The configuration '{}' was supplied but isn't a used config since the corresponding feature is not enabled.", key);
-    }
-
-    /**
-     * Log warnings for any unknown configurations
-     */
-    private void logUnknown() {
-        for (String key : unknown())
             log.warn("The configuration '{}' was supplied but isn't a known config.", key);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b42e798..27c108b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -186,7 +186,6 @@ public class KafkaConsumerTest {
             new AbstractMap.SimpleEntry<>(topicId2, topic2),
             new AbstractMap.SimpleEntry<>(topicId3, topic3))
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    public final String unknownTestConfig = "unknown.test.config";
 
     private final String partitionRevoked = "Hit partition revoke ";
     private final String partitionAssigned = "Hit partition assign ";
@@ -2851,41 +2850,6 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testUnknownConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.put(unknownTestConfig, "my_value");
-        ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
-
-        assertTrue(config.unknown().contains(unknownTestConfig));
-
-        try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(config, null, null)) {
-            assertTrue(config.unknown().contains(unknownTestConfig));
-            assertEquals(1, config.unknown().size());
-            assertEquals(0, config.unused().size());
-        }
-    }
-
-    @Test
-    public void testUnusedAndUnknownConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
-        props.put(unknownTestConfig, "my_value");
-        ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
-
-        assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
-        assertTrue(config.unknown().contains(unknownTestConfig));
-
-        try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(config, null, null)) {
-            assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
-            assertTrue(config.unknown().contains(unknownTestConfig));
-            assertEquals(1, config.unknown().size());
-            assertEquals(1, config.unused().size());
-        }
-    }
-
-    @Test
     public void testAssignorNameConflict() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bf202a9..81eb6e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -154,8 +154,6 @@ public class KafkaProducerTest {
     private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
     private static final Node NODE = new Node(0, "host1", 1000);
 
-    public final String unknownTestConfig = "unknown.test.config";
-
     private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> configs,
                   Serializer<K> keySerializer,
                   Serializer<V> valueSerializer,
@@ -1880,45 +1878,6 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testUnknownConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.put(unknownTestConfig, "my_value");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
-                new StringSerializer(), new StringSerializer()));
-
-        assertTrue(config.unknown().contains(unknownTestConfig));
-
-        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(config, null, null,
-                null, null, null, Time.SYSTEM)) {
-            assertTrue(config.unknown().contains(unknownTestConfig));
-            assertEquals(1, config.unknown().size());
-            assertEquals(0, config.unused().size());
-        }
-    }
-
-    @Test
-    public void testUnusedAndUnknownConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
-        props.put(unknownTestConfig, "my_value");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
-                new StringSerializer(), new StringSerializer()));
-
-        assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
-        assertTrue(config.unknown().contains(unknownTestConfig));
-
-        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(config, null, null,
-                null, null, null, Time.SYSTEM)) {
-            assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
-            assertTrue(config.unknown().contains(unknownTestConfig));
-            assertEquals(1, config.unknown().size());
-            assertEquals(1, config.unused().size());
-        }
-    }
-
-    @Test
     public void testNullTopicName() {
         // send a record with null topic should fail
         assertThrows(IllegalArgumentException.class, () -> new ProducerRecord<>(null, 1,
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index bcd41e5..e07cdb8 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -79,10 +79,8 @@ public class AbstractConfigTest {
         TestConfig config = new TestConfig(props);
         Map<String, Object> originalsWithPrefix = config.originalsWithPrefix("foo.");
 
-        assertTrue(config.unknown().contains("foo.bar"));
-        assertFalse(config.unused().contains("foo.bar"));
+        assertTrue(config.unused().contains("foo.bar"));
         originalsWithPrefix.get("bar");
-        assertFalse(config.unknown().contains("foo.bar"));
         assertFalse(config.unused().contains("foo.bar"));
 
         Map<String, Object> expected = new HashMap<>();
@@ -104,28 +102,25 @@ public class AbstractConfigTest {
         Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
 
         // prefix overrides global
-        assertTrue(config.unknown().contains("prefix.sasl.mechanism"));
+        assertTrue(config.unused().contains("prefix.sasl.mechanism"));
         assertTrue(config.unused().contains("sasl.mechanism"));
         assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism"));
         assertFalse(config.unused().contains("sasl.mechanism"));
         assertFalse(config.unused().contains("prefix.sasl.mechanism"));
-        assertFalse(config.unknown().contains("prefix.sasl.mechanism"));
 
         // prefix overrides default
-        assertTrue(config.unknown().contains("prefix.sasl.kerberos.kinit.cmd"));
+        assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
         assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
-        assertFalse(config.unknown().contains("prefix.sasl.kerberos.kinit.cmd"));
 
         // prefix override with no default
-        assertTrue(config.unknown().contains("prefix.ssl.truststore.location"));
+        assertTrue(config.unused().contains("prefix.ssl.truststore.location"));
         assertFalse(config.unused().contains("ssl.truststore.location"));
         assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location"));
         assertFalse(config.unused().contains("ssl.truststore.location"));
         assertFalse(config.unused().contains("prefix.ssl.truststore.location"));
-        assertFalse(config.unknown().contains("prefix.ssl.truststore.location"));
 
         // global overrides default
         assertTrue(config.unused().contains("ssl.keymanager.algorithm"));
@@ -167,33 +162,29 @@ public class AbstractConfigTest {
         Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
 
         // prefix with mechanism overrides global
-        assertTrue(config.unknown().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
-        assertTrue(config.unknown().contains("test-mechanism.sasl.jaas.config"));
+        assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+        assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
         assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
         assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
         assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
         assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
         assertFalse(config.unused().contains("sasl.jaas.config"));
-        assertFalse(config.unknown().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
-        assertFalse(config.unknown().contains("test-mechanism.sasl.jaas.config"));
 
         // prefix with mechanism overrides default
         assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
-        assertTrue(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
+        assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
         assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
         assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
-        assertFalse(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
 
         // prefix override for mechanism with no default
         assertFalse(config.unused().contains("sasl.kerberos.service.name"));
-        assertTrue(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+        assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
         assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
         assertFalse(config.unused().contains("sasl.kerberos.service.name"));
         assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
         assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
-        assertFalse(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
 
         // unset with no default
         assertTrue(config.unused().contains("ssl.provider"));
@@ -237,42 +228,14 @@ public class AbstractConfigTest {
         props.put(ConfiguredFakeMetricsReporter.EXTRA_CONFIG, "my_value");
         TestConfig config = new TestConfig(props);
 
-        assertTrue(config.unknown().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
-            ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unknown before getConfiguredInstances is called");
+        assertTrue(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
+                ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unused before getConfiguredInstances is called");
 
         config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
         assertFalse(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
             ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked as used");
     }
 
-    @Test
-    public void testUnknownConfigs() {
-        Properties props = new Properties();
-        props.put(TestConfig.UNKNOWN_TEST_CONFIG, "my_value");
-        TestConfig config = new TestConfig(props);
-
-        assertTrue(config.unknown().contains(TestConfig.UNKNOWN_TEST_CONFIG),
-                TestConfig.UNKNOWN_TEST_CONFIG + " should be marked unknown");
-        assertEquals(1, config.unknown().size());
-    }
-
-    @Test
-    public void testUnusedAndUnknownConfigs() {
-        Properties props = new Properties();
-        String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
-        props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
-        props.put(TestConfig.UNKNOWN_TEST_CONFIG, "my_value");
-        TestConfig config = new TestConfig(props);
-
-        assertTrue(config.unused().contains(TestConfig.METRIC_REPORTER_CLASSES_CONFIG));
-        assertTrue(config.unknown().contains(TestConfig.UNKNOWN_TEST_CONFIG),
-                TestConfig.UNKNOWN_TEST_CONFIG + " should be marked unknown");
-        assertEquals(1, config.unknown().size());
-
-        config.get(TestConfig.METRIC_REPORTER_CLASSES_CONFIG);
-        assertFalse(config.unused().contains(TestConfig.METRIC_REPORTER_CLASSES_CONFIG));
-    }
-
     private void testValidInputs(String configValue) {
         Properties props = new Properties();
         props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
@@ -620,7 +583,6 @@ public class AbstractConfigTest {
 
         private static final ConfigDef CONFIG;
 
-        public static final String UNKNOWN_TEST_CONFIG = "unknown.test.config";
         public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
         private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";