You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/17 19:25:53 UTC

[kafka] branch trunk updated: KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public (#4341)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 11f3db0  KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public (#4341)
11f3db0 is described below

commit 11f3db0b731739017174e488670e529af4dc22ae
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jan 17 11:25:50 2018 -0800

    KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public (#4341)
    
    * KIP-234
    * update constructors to accept Properties and Map<String,Object>
    * use ProducerConfig to access BATCH_SIZE default value in Streams
---
 .../org/apache/kafka/clients/consumer/ConsumerConfig.java   |  9 ++++++---
 .../org/apache/kafka/clients/producer/KafkaProducer.java    |  7 +++++--
 .../org/apache/kafka/clients/producer/ProducerConfig.java   |  9 +++++++--
 .../main/java/org/apache/kafka/streams/StreamsConfig.java   | 13 ++++++++++---
 4 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index be3077f..3fe58d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -456,8 +456,7 @@ public class ConsumerConfig extends AbstractConfig {
     public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
                                                               Deserializer<?> keyDeserializer,
                                                               Deserializer<?> valueDeserializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
-        newConfigs.putAll(configs);
+        Map<String, Object> newConfigs = new HashMap<>(configs);
         if (keyDeserializer != null)
             newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
         if (valueDeserializer != null)
@@ -477,7 +476,11 @@ public class ConsumerConfig extends AbstractConfig {
         return newProperties;
     }
 
-    ConsumerConfig(Map<?, ?> props) {
+    public ConsumerConfig(Properties props) {
+        super(CONFIG, props);
+    }
+
+    public ConsumerConfig(Map<String, Object> props) {
         super(CONFIG, props);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 14f405b..4e67fe8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -267,7 +267,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param configs   The producer configs
      *
      */
-    public KafkaProducer(Map<String, Object> configs) {
+    public KafkaProducer(final Map<String, Object> configs) {
         this(new ProducerConfig(configs), null, null, null, null);
     }
 
@@ -286,7 +286,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer, null, null);
+            keySerializer,
+            valueSerializer,
+            null,
+            null);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9bf3f4..0631814 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -347,7 +347,8 @@ public class ProducerConfig extends AbstractConfig {
     }
 
     public static Properties addSerializerToConfig(Properties properties,
-                                                   Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+                                                   Serializer<?> keySerializer,
+                                                   Serializer<?> valueSerializer) {
         Properties newProperties = new Properties();
         newProperties.putAll(properties);
         if (keySerializer != null)
@@ -357,7 +358,11 @@ public class ProducerConfig extends AbstractConfig {
         return newProperties;
     }
 
-    ProducerConfig(Map<?, ?> props) {
+    public ProducerConfig(Properties props) {
+        super(CONFIG, props);
+    }
+
+    public ProducerConfig(Map<String, Object> props) {
         super(CONFIG, props);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b4908e8..0feb48d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -46,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -765,8 +766,8 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
 
         // add admin retries configs for creating topics
-        final AdminClientConfig config = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
-        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), config.getInt(AdminClientConfig.RETRIES_CONFIG));
+        final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
+        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
 
         // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
         final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
@@ -774,7 +775,13 @@ public class StreamsConfig extends AbstractConfig {
         if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
             final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
             final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-            final int batchSize = producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()) : 16384;
+            final int batchSize;
+            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+                batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
+            } else {
+                final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
+                batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
+            }
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].