You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/17 19:25:53 UTC
[kafka] branch trunk updated: KAFKA-6382: Make ProducerConfig and
ConsumerConfig constructors public (#4341)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 11f3db0 KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public (#4341)
11f3db0 is described below
commit 11f3db0b731739017174e488670e529af4dc22ae
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jan 17 11:25:50 2018 -0800
KAFKA-6382: Make ProducerConfig and ConsumerConfig constructors public (#4341)
* KIP-234
* update constructors to accept Properties and Map<String,Object>
* use ProducerConfig to access BATCH_SIZE default value in Streams
---
.../org/apache/kafka/clients/consumer/ConsumerConfig.java | 9 ++++++---
.../org/apache/kafka/clients/producer/KafkaProducer.java | 7 +++++--
.../org/apache/kafka/clients/producer/ProducerConfig.java | 9 +++++++--
.../main/java/org/apache/kafka/streams/StreamsConfig.java | 13 ++++++++++---
4 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index be3077f..3fe58d7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -456,8 +456,7 @@ public class ConsumerConfig extends AbstractConfig {
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
- Map<String, Object> newConfigs = new HashMap<String, Object>();
- newConfigs.putAll(configs);
+ Map<String, Object> newConfigs = new HashMap<>(configs);
if (keyDeserializer != null)
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
if (valueDeserializer != null)
@@ -477,7 +476,11 @@ public class ConsumerConfig extends AbstractConfig {
return newProperties;
}
- ConsumerConfig(Map<?, ?> props) {
+ public ConsumerConfig(Properties props) {
+ super(CONFIG, props);
+ }
+
+ public ConsumerConfig(Map<String, Object> props) {
super(CONFIG, props);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 14f405b..4e67fe8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -267,7 +267,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param configs The producer configs
*
*/
- public KafkaProducer(Map<String, Object> configs) {
+ public KafkaProducer(final Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null, null, null);
}
@@ -286,7 +286,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
- keySerializer, valueSerializer, null, null);
+ keySerializer,
+ valueSerializer,
+ null,
+ null);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9bf3f4..0631814 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -347,7 +347,8 @@ public class ProducerConfig extends AbstractConfig {
}
public static Properties addSerializerToConfig(Properties properties,
- Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+ Serializer<?> keySerializer,
+ Serializer<?> valueSerializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keySerializer != null)
@@ -357,7 +358,11 @@ public class ProducerConfig extends AbstractConfig {
return newProperties;
}
- ProducerConfig(Map<?, ?> props) {
+ public ProducerConfig(Properties props) {
+ super(CONFIG, props);
+ }
+
+ public ProducerConfig(Map<String, Object> props) {
super(CONFIG, props);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b4908e8..0feb48d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -765,8 +766,8 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
// add admin retries configs for creating topics
- final AdminClientConfig config = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
- consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), config.getInt(AdminClientConfig.RETRIES_CONFIG));
+ final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
+ consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
@@ -774,7 +775,13 @@ public class StreamsConfig extends AbstractConfig {
if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- final int batchSize = producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()) : 16384;
+ final int batchSize;
+ if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+ batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
+ } else {
+ final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties());
+ batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
+ }
if (segmentSize < batchSize) {
throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].