You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 23:27:16 UTC

[kafka] branch trunk updated: MINOR: Simplify handling of KafkaProducer serializer overrides

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b403a09  MINOR: Simplify handling of KafkaProducer serializer overrides
b403a09 is described below

commit b403a09ec0d78a352411414aaac0663885f777c0
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Oct 25 16:26:55 2018 -0700

    MINOR: Simplify handling of KafkaProducer serializer overrides
    
    Author: Ismael Juma <is...@juma.me.uk>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5781 from ijuma/simplify-producer-constructor
---
 .../kafka/clients/producer/KafkaProducer.java      |  30 +++--
 .../kafka/clients/producer/KafkaProducerTest.java  | 127 ++++++++++-----------
 2 files changed, 85 insertions(+), 72 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c68a014..3fcb2c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -269,7 +270,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *
      */
     public KafkaProducer(final Map<String, Object> configs) {
-        this(new ProducerConfig(configs), null, null, null, null, null, Time.SYSTEM);
+        this(configs, null, null, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -286,8 +287,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *                         be called in the producer when the serializer is passed in directly.
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-            keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
+        this(configs, keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -298,7 +298,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties), null, null, null, null, null, Time.SYSTEM);
+        this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -313,19 +313,21 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *                         be called in the producer when the serializer is passed in directly.
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
+        this(propsToMap(properties), keySerializer, valueSerializer, null, null, null,
+                Time.SYSTEM);
     }
 
     // visible for testing
     @SuppressWarnings("unchecked")
-    KafkaProducer(ProducerConfig config,
+    KafkaProducer(Map<String, Object> configs,
                   Serializer<K> keySerializer,
                   Serializer<V> valueSerializer,
                   Metadata metadata,
                   KafkaClient kafkaClient,
                   ProducerInterceptors interceptors,
                   Time time) {
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
+                valueSerializer));
         try {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
@@ -1188,6 +1190,20 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    private static Map<String, Object> propsToMap(Properties properties) {
+        Map<String, Object> map = new HashMap<>(properties.size());
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+            if (entry.getKey() instanceof String) {
+                String k = (String) entry.getKey();
+                map.put(k, properties.get(k));
+            }
+            else {
+                throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
+            }
+        }
+        return map;
+    }
+
     private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {
         ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
         for (List<?> candidateList: candidateLists)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 77fcb51..b5d7709 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -128,6 +128,18 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void testConstructorWithNotStringKey() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(1, "not string key");
+        try (KafkaProducer<?, ?> ff = new KafkaProducer(props, new StringSerializer(), new StringSerializer())) {
+            fail("Constructor should throw exception");
+        } catch (ConfigException e) {
+            assertTrue("Unexpected exception message: " + e.getMessage(), e.getMessage().contains("not string key"));
+        }
+    }
+
+    @Test
     public void testSerializerClose() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
@@ -197,10 +209,10 @@ public class KafkaProducerTest {
 
     @Test
     public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
-        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1");
+        Map<String, Object> configs = new HashMap();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
+        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
 
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -212,9 +224,8 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        final Producer<String, String> producer = new KafkaProducer<>(
-            new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-            new StringSerializer(), new StringSerializer(), metadata, client, null, time);
+        final Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> closeException = new AtomicReference<>();
@@ -278,10 +289,8 @@ public class KafkaProducerTest {
 
     @Test
     public void testMetadataFetch() throws InterruptedException {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
-                new StringSerializer()));
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         String topic = "topic";
         Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
         final Cluster emptyCluster = new Cluster(null, nodes,
@@ -299,8 +308,8 @@ public class KafkaProducerTest {
         // Return empty cluster 4 times and cluster from then on
         when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, cluster);
 
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
-                metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                 // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -332,10 +341,8 @@ public class KafkaProducerTest {
 
     @Test
     public void testMetadataFetchOnStaleMetadata() throws Exception {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
-                new StringSerializer()));
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         String topic = "topic";
         ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value");
         // Create a record with a partition higher than the initial (outdated) partition range
@@ -374,8 +381,8 @@ public class KafkaProducerTest {
             return emptyCluster;
         });
 
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
-                metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                 // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -419,19 +426,17 @@ public class KafkaProducerTest {
 
     @Test
     public void testTopicRefreshInMetadata() throws InterruptedException {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
-            new StringSerializer()));
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
         final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true,
                 true, new ClusterResourceListeners());
         final Time time = new MockTime();
         final String topic = "topic";
-        try (KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null, metadata,
-                null, null, time)) {
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, null, null, time)) {
 
             Thread t = new Thread(() -> {
                 long startTimeMs = System.currentTimeMillis();
@@ -466,14 +471,12 @@ public class KafkaProducerTest {
     }
 
     private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
         Serializer<String> keySerializer = mock(serializerClassToMock);
         @SuppressWarnings("unchecked")
         Serializer<String> valueSerializer = mock(serializerClassToMock);
-        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, keySerializer,
-                valueSerializer));
 
         String topic = "topic";
         final Cluster cluster = new Cluster(
@@ -485,8 +488,8 @@ public class KafkaProducerTest {
         Metadata metadata = new Metadata(0, 90000, true);
         metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
 
-        KafkaProducer<String, String> producer = new KafkaProducer<>(config, keySerializer, valueSerializer,
-                metadata, null, null, Time.SYSTEM);
+        KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
+                null, null, Time.SYSTEM);
 
         when(keySerializer.serialize(any(), any(), any())).then(invocation ->
                 invocation.<String>getArgument(2).getBytes());
@@ -543,11 +546,9 @@ public class KafkaProducerTest {
 
     @Test
     public void testInterceptorPartitionSetOnTooLargeRecord() {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
-        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
-                new StringSerializer()));
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
 
@@ -562,8 +563,8 @@ public class KafkaProducerTest {
 
         @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
                 ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
-        KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null,
-                metadata, null, interceptors, Time.SYSTEM);
+        KafkaProducer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, null, interceptors, Time.SYSTEM);
 
         when(interceptors.onSend(any())).then(invocation -> invocation.getArgument(0));
 
@@ -589,10 +590,10 @@ public class KafkaProducerTest {
 
     @Test(expected = TimeoutException.class)
     public void testInitTransactionTimeout() {
-        Properties props = new Properties();
-        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
         Time time = Time.SYSTEM;
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -604,9 +605,8 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        try (Producer<String, String> producer = new KafkaProducer<>(
-                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                null, null, metadata, client, null, time)) {
+        try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
             producer.initTransactions();
             fail("initTransactions() should have raised TimeoutException");
         }
@@ -614,10 +614,10 @@ public class KafkaProducerTest {
 
     @Test(expected = KafkaException.class)
     public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
-        Properties props = new Properties();
-        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -629,9 +629,8 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        Producer<String, String> producer = new KafkaProducer<>(
-                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                null, null, metadata, client, null, time);
+        Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+                metadata, client, null, time);
         try {
             producer.initTransactions();
         } catch (TimeoutException e) {
@@ -647,9 +646,9 @@ public class KafkaProducerTest {
 
     @Test
     public void testSendToInvalidTopic() throws Exception {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
 
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster();
@@ -661,9 +660,8 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
-                ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                null, null, metadata, client, null, time);
+        Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+                metadata, client, null, time);
 
         String invalidTopicName = "topic abc"; // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
@@ -690,9 +688,9 @@ public class KafkaProducerTest {
 
     @Test
     public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
-        Properties props = new Properties();
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
         // Simulate a case where metadata for a particular topic is not available. This will cause KafkaProducer#send to
         // block in Metadata#awaitUpdate for the configured max.block.ms. When close() is invoked, KafkaProducer#send should
@@ -706,9 +704,8 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        Producer<String, String> producer = new KafkaProducer<>(
-                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client, null, time);
+        Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+                metadata, client, null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> sendException = new AtomicReference<>();