You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/01 01:53:15 UTC

[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

junrao commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533002272



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -314,27 +315,23 @@ public KafkaProducer(Properties properties) {
      *                         be called in the producer when the serializer is passed in directly.
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(Utils.propsToMap(properties), keySerializer, valueSerializer, null, null, null,
-                Time.SYSTEM);
+        this(Utils.propsToMap(properties), keySerializer, valueSerializer);
     }
 
     // visible for testing
     @SuppressWarnings("unchecked")
-    KafkaProducer(Map<String, Object> configs,
+    KafkaProducer(ProducerConfig config,
                   Serializer<K> keySerializer,
                   Serializer<V> valueSerializer,
                   ProducerMetadata metadata,
                   KafkaClient kafkaClient,
                   ProducerInterceptors<K, V> interceptors,
                   Time time) {
-        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
-                valueSerializer));
         try {
-            Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = time;
 
-            String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+            String transactionalId = (String) config.originals().get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

Review comment:
       Could we just do `config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)` here?

##########
File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##########
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
         // test configs with listener prefix
         Map<String, Object> configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1"));
+
         assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+        assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
         assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka");
+        assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
         assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal");
+        assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));

Review comment:
       Should gssapi.sasl.kerberos.service.name be sasl.kerberos.service.name?

##########
File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##########
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
         // test configs with listener prefix
         Map<String, Object> configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1"));
+
         assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+        assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
         assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka");
+        assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
         assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal");
+        assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
         assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name"));
+        assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name"));
 
         assertNull(configs.get("plain.sasl.server.callback.handler.class"));
+        assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class"));
+
         assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), "custom.config1");
+        assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key"));
+
         assertEquals(configs.get("custom.config2.key"), "custom.config2");
+        assertFalse(securityConfig.unused().contains("custom.config2.key"));
 
         // test configs without listener prefix
+        securityConfig = new TestSecurityConfig(props);

Review comment:
       Do we need to instantiate again?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##########
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map<?, ?> originals,  Map<String, ?>
                 throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
 
         this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
-        this.values = definition.parse(this.originals);
+        // pass a copy to definition.parse. Otherwise, the definition.parse adds all keys of definitions to "used" group
+        // since definition.parse needs to call "RecordingMap#get" when checking all definitions.
+        this.values = definition.parse(new HashMap<>(this.originals));

Review comment:
       Hmm, I am still a bit confused. My understanding is that with the latest change, ProducerConfig will only be instantiated once and thus the passed in originals will never be a RecordingMap. But it seems this is still needed? Could you explain a bit more why this is the case?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1271,27 +1270,48 @@ public void testProducerJmxPrefix() throws  Exception {
         producer.close();
     }
 
-    private ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) {
-        return new ProducerMetadata(refreshBackoffMs, expirationMs, defaultMetadataIdleMs,
+    private static ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) {
+        return new ProducerMetadata(refreshBackoffMs, expirationMs, DEFAULT_METADATA_IDLE_MS,
                 new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
     }
 
     @Test
-    public void serializerShouldSeeGeneratedClientId() {
+    public void configurableObjectsShouldSeeGeneratedClientId() {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
+        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerForClientId.class.getName());
+        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForClientId.class.getName());
 
         KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
-        assertEquals(2, SerializerForClientId.CLIENT_IDS.size());
-        assertEquals(SerializerForClientId.CLIENT_IDS.get(0), producer.getClientId());
-        assertEquals(SerializerForClientId.CLIENT_IDS.get(1), producer.getClientId());
+        assertNotNull(producer.getClientId());
+        assertNotEquals(0, producer.getClientId().length());
+        assertEquals(4, CLIENT_IDS.size());
+        CLIENT_IDS.forEach(id -> assertEquals(id, producer.getClientId()));
         producer.close();
     }
 
+    @Test
+    public void testUnusedConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
+                new StringSerializer(), new StringSerializer()));
+
+        assertTrue(new ProducerConfig(config.originals(), false).unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));

Review comment:
       Is this test necessary? Do we still have a case where we pass in a RecordingMap to  ProducerConfig?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org