You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/01 03:24:08 UTC
[kafka] branch trunk updated: KAFKA-10326: Both serializer and
deserializer should be able to see generated ID (#9102)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8090ad KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102)
b8090ad is described below
commit b8090add335903e6daf38ea99d0f94be5a5e6ed4
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Oct 1 11:23:07 2020 +0800
KAFKA-10326: Both serializer and deserializer should be able to see generated ID (#9102)
Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/consumer/KafkaConsumer.java | 4 ++--
.../kafka/clients/producer/KafkaProducer.java | 4 ++--
.../apache/kafka/common/config/AbstractConfig.java | 7 ++++++
.../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++++++++++++++++++++++
.../kafka/clients/producer/KafkaProducerTest.java | 27 ++++++++++++++++++++++
.../kafka/common/config/AbstractConfigTest.java | 9 ++++++++
6 files changed, 74 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 91070f9..ac996cd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -704,14 +704,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.keyDeserializer.configure(config.originals(), true);
+ this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.valueDeserializer.configure(config.originals(), false);
+ this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 12ecc5c..ebd1746 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
- this.keySerializer.configure(config.originals(), true);
+ this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
@@ -371,7 +371,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
- this.valueSerializer.configure(config.originals(), false);
+ this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 968c549..a747e50 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -228,6 +228,13 @@ public class AbstractConfig {
return copy;
}
+ public Map<String, Object> originals(Map<String, Object> configOverrides) {
+ Map<String, Object> copy = new RecordingMap<>();
+ copy.putAll(originals);
+ copy.putAll(configOverrides);
+ return copy;
+ }
+
/**
* Get all the original settings, ensuring that all values are of type String.
* @return the original settings
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 401dde2..1248845 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2535,4 +2535,31 @@ public class KafkaConsumerTest {
assertEquals(countingRebalanceListener.revokedCount, 1);
}
+
+ @Test
+ public void deserializerShouldSeeGeneratedClientId() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
+
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
+ assertEquals(2, DeserializerForClientId.CLIENT_IDS.size());
+ assertEquals(DeserializerForClientId.CLIENT_IDS.get(0), consumer.getClientId());
+ assertEquals(DeserializerForClientId.CLIENT_IDS.get(1), consumer.getClientId());
+ consumer.close();
+ }
+
+ public static class DeserializerForClientId implements Deserializer<byte[]> {
+ static final List<String> CLIENT_IDS = new ArrayList<>();
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
+ }
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data) {
+ return data;
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 68667b9..0c809f5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1276,4 +1276,31 @@ public class KafkaProducerTest {
new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
}
+ @Test
+ public void serializerShouldSeeGeneratedClientId() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName());
+
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+ assertEquals(2, SerializerForClientId.CLIENT_IDS.size());
+ assertEquals(SerializerForClientId.CLIENT_IDS.get(0), producer.getClientId());
+ assertEquals(SerializerForClientId.CLIENT_IDS.get(1), producer.getClientId());
+ producer.close();
+ }
+
+ public static class SerializerForClientId implements Serializer<byte[]> {
+ static final List<String> CLIENT_IDS = new ArrayList<>();
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ CLIENT_IDS.add(configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString());
+ }
+
+ @Override
+ public byte[] serialize(String topic, byte[] data) {
+ return data;
+ }
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 73f83d7..04085ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -335,6 +335,15 @@ public class AbstractConfigTest {
}
@Test
+ public void testOriginalWithOverrides() {
+ Properties props = new Properties();
+ props.put("config.providers", "file");
+ TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
+ assertEquals(config.originals().get("config.providers"), "file");
+ assertEquals(config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"), "file2");
+ }
+
+ @Test
public void testOriginalsWithConfigProvidersProps() {
Properties props = new Properties();