You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/01 03:24:08 UTC

[kafka] branch trunk updated: KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8090ad  KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102)
b8090ad is described below

commit b8090add335903e6daf38ea99d0f94be5a5e6ed4
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Oct 1 11:23:07 2020 +0800

    KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 ++--
 .../kafka/clients/producer/KafkaProducer.java      |  4 ++--
 .../apache/kafka/common/config/AbstractConfig.java |  7 ++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 27 ++++++++++++++++++++++
 .../kafka/clients/producer/KafkaProducerTest.java  | 27 ++++++++++++++++++++++
 .../kafka/common/config/AbstractConfigTest.java    |  9 ++++++++
 6 files changed, 74 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 91070f9..ac996cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -704,14 +704,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.interceptors = new ConsumerInterceptors<>(interceptorList);
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-                this.keyDeserializer.configure(config.originals(), true);
+                this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
             } else {
                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                 this.keyDeserializer = keyDeserializer;
             }
             if (valueDeserializer == null) {
                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-                this.valueDeserializer.configure(config.originals(), false);
+                this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
             } else {
                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                 this.valueDeserializer = valueDeserializer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 12ecc5c..ebd1746 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             if (keySerializer == null) {
                 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                          Serializer.class);
-                this.keySerializer.configure(config.originals(), true);
+                this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
             } else {
                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                 this.keySerializer = keySerializer;
@@ -371,7 +371,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             if (valueSerializer == null) {
                 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                            Serializer.class);
-                this.valueSerializer.configure(config.originals(), false);
+                this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
             } else {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = valueSerializer;
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 968c549..a747e50 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -228,6 +228,13 @@ public class AbstractConfig {
         return copy;
     }
 
+    public Map<String, Object> originals(Map<String, Object> configOverrides) {
+        Map<String, Object> copy = new RecordingMap<>();
+        copy.putAll(originals);
+        copy.putAll(configOverrides);
+        return copy;
+    }
+
     /**
      * Get all the original settings, ensuring that all values are of type String.
      * @return the original settings
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 401dde2..1248845 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2535,4 +2535,31 @@ public class KafkaConsumerTest {
 
         assertEquals(countingRebalanceListener.revokedCount, 1);
     }
+
+    @Test
+    public void deserializerShouldSeeGeneratedClientId() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
+        assertEquals(2, DeserializerForClientId.CLIENT_IDS.size());
+        assertEquals(DeserializerForClientId.CLIENT_IDS.get(0), consumer.getClientId());
+        assertEquals(DeserializerForClientId.CLIENT_IDS.get(1), consumer.getClientId());
+        consumer.close();
+    }
+
+    public static class DeserializerForClientId implements Deserializer<byte[]> {
+        static final List<String> CLIENT_IDS = new ArrayList<>();
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
+        }
+
+        @Override
+        public byte[] deserialize(String topic, byte[] data) {
+            return data;
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 68667b9..0c809f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1276,4 +1276,31 @@ public class KafkaProducerTest {
                 new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
     }
 
+    @Test
+    public void serializerShouldSeeGeneratedClientId() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
+
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+        assertEquals(2, SerializerForClientId.CLIENT_IDS.size());
+        assertEquals(SerializerForClientId.CLIENT_IDS.get(0), producer.getClientId());
+        assertEquals(SerializerForClientId.CLIENT_IDS.get(1), producer.getClientId());
+        producer.close();
+    }
+
+    public static class SerializerForClientId implements Serializer<byte[]> {
+        static final List<String> CLIENT_IDS = new ArrayList<>();
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            CLIENT_IDS.add(configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString());
+        }
+
+        @Override
+        public byte[] serialize(String topic, byte[] data) {
+            return data;
+        }
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 73f83d7..04085ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -335,6 +335,15 @@ public class AbstractConfigTest {
     }
 
     @Test
+    public void testOriginalWithOverrides() {
+        Properties props = new Properties();
+        props.put("config.providers", "file");
+        TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+        assertEquals(config.originals().get("config.providers"), "file");
+        assertEquals(config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"), "file2");
+    }
+
+    @Test
     public void testOriginalsWithConfigProvidersProps() {
         Properties props = new Properties();