You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/10 02:56:20 UTC
[kafka] branch trunk updated: KAFKA-13689: Revert AbstractConfig code changes (#11863)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84b41b9 KAFKA-13689: Revert AbstractConfig code changes (#11863)
84b41b9 is described below
commit 84b41b9d3ad4b45a9137197a304996759e993cde
Author: RivenSun <91...@users.noreply.github.com>
AuthorDate: Thu Mar 10 10:54:10 2022 +0800
KAFKA-13689: Revert AbstractConfig code changes (#11863)
Reviewer: Luke Chen <sh...@gmail.com>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../apache/kafka/common/config/AbstractConfig.java | 25 +---------
.../kafka/clients/consumer/KafkaConsumerTest.java | 36 --------------
.../kafka/clients/producer/KafkaProducerTest.java | 41 ---------------
.../kafka/common/config/AbstractConfigTest.java | 58 ++++------------------
7 files changed, 15 insertions(+), 151 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index b58b411..03322fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -592,7 +592,7 @@ public class KafkaAdminClient extends AdminClient {
new TimeoutProcessorFactory() : timeoutProcessorFactory;
this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
- config.logUnusedAndUnknown();
+ config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
thread.start();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3e7846f..0fd4ea9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -812,7 +812,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);
- config.logUnusedAndUnknown();
+ config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 610bfc8..f739336 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
- config.logUnusedAndUnknown();
+ config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 2b915d5..7ef4609 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -223,14 +223,6 @@ public class AbstractConfig {
public Set<String> unused() {
Set<String> keys = new HashSet<>(originals.keySet());
keys.removeAll(used);
- keys.removeAll(unknown());
- return keys;
- }
-
- public Set<String> unknown() {
- Set<String> keys = new HashSet<>(originals.keySet());
- keys.removeAll(values.keySet());
- keys.removeAll(used);
return keys;
}
@@ -384,24 +376,11 @@ public class AbstractConfig {
log.info(b.toString());
}
- public void logUnusedAndUnknown() {
- logUnused();
- logUnknown();
- }
-
/**
- * Log infos for any unused configurations to user, exclude unknown configurations
+ * Log warnings for any unused configurations
*/
- private void logUnused() {
+ public void logUnused() {
for (String key : unused())
- log.info("The configuration '{}' was supplied but isn't a used config since the corresponding feature is not enabled.", key);
- }
-
- /**
- * Log warnings for any unknown configurations
- */
- private void logUnknown() {
- for (String key : unknown())
log.warn("The configuration '{}' was supplied but isn't a known config.", key);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b42e798..27c108b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -186,7 +186,6 @@ public class KafkaConsumerTest {
new AbstractMap.SimpleEntry<>(topicId2, topic2),
new AbstractMap.SimpleEntry<>(topicId3, topic3))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- public final String unknownTestConfig = "unknown.test.config";
private final String partitionRevoked = "Hit partition revoke ";
private final String partitionAssigned = "Hit partition assign ";
@@ -2851,41 +2850,6 @@ public class KafkaConsumerTest {
}
@Test
- public void testUnknownConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.put(unknownTestConfig, "my_value");
- ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
-
- assertTrue(config.unknown().contains(unknownTestConfig));
-
- try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(config, null, null)) {
- assertTrue(config.unknown().contains(unknownTestConfig));
- assertEquals(1, config.unknown().size());
- assertEquals(0, config.unused().size());
- }
- }
-
- @Test
- public void testUnusedAndUnknownConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
- props.put(unknownTestConfig, "my_value");
- ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
-
- assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
- assertTrue(config.unknown().contains(unknownTestConfig));
-
- try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(config, null, null)) {
- assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
- assertTrue(config.unknown().contains(unknownTestConfig));
- assertEquals(1, config.unknown().size());
- assertEquals(1, config.unused().size());
- }
- }
-
- @Test
public void testAssignorNameConflict() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index bf202a9..81eb6e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -154,8 +154,6 @@ public class KafkaProducerTest {
private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
private static final Node NODE = new Node(0, "host1", 1000);
- public final String unknownTestConfig = "unknown.test.config";
-
private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
@@ -1880,45 +1878,6 @@ public class KafkaProducerTest {
}
@Test
- public void testUnknownConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.put(unknownTestConfig, "my_value");
- ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
- new StringSerializer(), new StringSerializer()));
-
- assertTrue(config.unknown().contains(unknownTestConfig));
-
- try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(config, null, null,
- null, null, null, Time.SYSTEM)) {
- assertTrue(config.unknown().contains(unknownTestConfig));
- assertEquals(1, config.unknown().size());
- assertEquals(0, config.unused().size());
- }
- }
-
- @Test
- public void testUnusedAndUnknownConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
- props.put(unknownTestConfig, "my_value");
- ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
- new StringSerializer(), new StringSerializer()));
-
- assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
- assertTrue(config.unknown().contains(unknownTestConfig));
-
- try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(config, null, null,
- null, null, null, Time.SYSTEM)) {
- assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
- assertTrue(config.unknown().contains(unknownTestConfig));
- assertEquals(1, config.unknown().size());
- assertEquals(1, config.unused().size());
- }
- }
-
- @Test
public void testNullTopicName() {
// send a record with null topic should fail
assertThrows(IllegalArgumentException.class, () -> new ProducerRecord<>(null, 1,
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index bcd41e5..e07cdb8 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -79,10 +79,8 @@ public class AbstractConfigTest {
TestConfig config = new TestConfig(props);
Map<String, Object> originalsWithPrefix = config.originalsWithPrefix("foo.");
- assertTrue(config.unknown().contains("foo.bar"));
- assertFalse(config.unused().contains("foo.bar"));
+ assertTrue(config.unused().contains("foo.bar"));
originalsWithPrefix.get("bar");
- assertFalse(config.unknown().contains("foo.bar"));
assertFalse(config.unused().contains("foo.bar"));
Map<String, Object> expected = new HashMap<>();
@@ -104,28 +102,25 @@ public class AbstractConfigTest {
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix overrides global
- assertTrue(config.unknown().contains("prefix.sasl.mechanism"));
+ assertTrue(config.unused().contains("prefix.sasl.mechanism"));
assertTrue(config.unused().contains("sasl.mechanism"));
assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism"));
assertFalse(config.unused().contains("sasl.mechanism"));
assertFalse(config.unused().contains("prefix.sasl.mechanism"));
- assertFalse(config.unknown().contains("prefix.sasl.mechanism"));
// prefix overrides default
- assertTrue(config.unknown().contains("prefix.sasl.kerberos.kinit.cmd"));
+ assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
- assertFalse(config.unknown().contains("prefix.sasl.kerberos.kinit.cmd"));
// prefix override with no default
- assertTrue(config.unknown().contains("prefix.ssl.truststore.location"));
+ assertTrue(config.unused().contains("prefix.ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertFalse(config.unused().contains("prefix.ssl.truststore.location"));
- assertFalse(config.unknown().contains("prefix.ssl.truststore.location"));
// global overrides default
assertTrue(config.unused().contains("ssl.keymanager.algorithm"));
@@ -167,33 +162,29 @@ public class AbstractConfigTest {
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix with mechanism overrides global
- assertTrue(config.unknown().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
- assertTrue(config.unknown().contains("test-mechanism.sasl.jaas.config"));
+ assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
+ assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("sasl.jaas.config"));
- assertFalse(config.unknown().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
- assertFalse(config.unknown().contains("test-mechanism.sasl.jaas.config"));
// prefix with mechanism overrides default
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
- assertTrue(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
+ assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
- assertFalse(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
// prefix override for mechanism with no default
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
- assertTrue(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+ assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
- assertFalse(config.unknown().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
// unset with no default
assertTrue(config.unused().contains("ssl.provider"));
@@ -237,42 +228,14 @@ public class AbstractConfigTest {
props.put(ConfiguredFakeMetricsReporter.EXTRA_CONFIG, "my_value");
TestConfig config = new TestConfig(props);
- assertTrue(config.unknown().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
- ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unknown before getConfiguredInstances is called");
+ assertTrue(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
+ ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked unused before getConfiguredInstances is called");
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
assertFalse(config.unused().contains(ConfiguredFakeMetricsReporter.EXTRA_CONFIG),
ConfiguredFakeMetricsReporter.EXTRA_CONFIG + " should be marked as used");
}
- @Test
- public void testUnknownConfigs() {
- Properties props = new Properties();
- props.put(TestConfig.UNKNOWN_TEST_CONFIG, "my_value");
- TestConfig config = new TestConfig(props);
-
- assertTrue(config.unknown().contains(TestConfig.UNKNOWN_TEST_CONFIG),
- TestConfig.UNKNOWN_TEST_CONFIG + " should be marked unknown");
- assertEquals(1, config.unknown().size());
- }
-
- @Test
- public void testUnusedAndUnknownConfigs() {
- Properties props = new Properties();
- String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
- props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
- props.put(TestConfig.UNKNOWN_TEST_CONFIG, "my_value");
- TestConfig config = new TestConfig(props);
-
- assertTrue(config.unused().contains(TestConfig.METRIC_REPORTER_CLASSES_CONFIG));
- assertTrue(config.unknown().contains(TestConfig.UNKNOWN_TEST_CONFIG),
- TestConfig.UNKNOWN_TEST_CONFIG + " should be marked unknown");
- assertEquals(1, config.unknown().size());
-
- config.get(TestConfig.METRIC_REPORTER_CLASSES_CONFIG);
- assertFalse(config.unused().contains(TestConfig.METRIC_REPORTER_CLASSES_CONFIG));
- }
-
private void testValidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
@@ -620,7 +583,6 @@ public class AbstractConfigTest {
private static final ConfigDef CONFIG;
- public static final String UNKNOWN_TEST_CONFIG = "unknown.test.config";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";