You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/10/25 23:27:16 UTC
[kafka] branch trunk updated: MINOR: Simplify handling of
KafkaProducer serializer overrides
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b403a09 MINOR: Simplify handling of KafkaProducer serializer overrides
b403a09 is described below
commit b403a09ec0d78a352411414aaac0663885f777c0
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Oct 25 16:26:55 2018 -0700
MINOR: Simplify handling of KafkaProducer serializer overrides
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Dong Lin <li...@gmail.com>
Closes #5781 from ijuma/simplify-producer-constructor
---
.../kafka/clients/producer/KafkaProducer.java | 30 +++--
.../kafka/clients/producer/KafkaProducerTest.java | 127 ++++++++++-----------
2 files changed, 85 insertions(+), 72 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c68a014..3fcb2c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -269,7 +270,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*
*/
public KafkaProducer(final Map<String, Object> configs) {
- this(new ProducerConfig(configs), null, null, null, null, null, Time.SYSTEM);
+ this(configs, null, null, null, null, null, Time.SYSTEM);
}
/**
@@ -286,8 +287,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
- keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
+ this(configs, keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}
/**
@@ -298,7 +298,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param properties The producer configs
*/
public KafkaProducer(Properties properties) {
- this(new ProducerConfig(properties), null, null, null, null, null, Time.SYSTEM);
+ this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
}
/**
@@ -313,19 +313,21 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
- keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
+ this(propsToMap(properties), keySerializer, valueSerializer, null, null, null,
+ Time.SYSTEM);
}
// visible for testing
@SuppressWarnings("unchecked")
- KafkaProducer(ProducerConfig config,
+ KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
+ ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
+ valueSerializer));
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
@@ -1188,6 +1190,20 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
+ private static Map<String, Object> propsToMap(Properties properties) {
+ Map<String, Object> map = new HashMap<>(properties.size());
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ if (entry.getKey() instanceof String) {
+ String k = (String) entry.getKey();
+ map.put(k, properties.get(k));
+ }
+ else {
+ throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
+ }
+ }
+ return map;
+ }
+
private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 77fcb51..b5d7709 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -128,6 +128,18 @@ public class KafkaProducerTest {
}
@Test
+ public void testConstructorWithNotStringKey() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.put(1, "not string key");
+ try (KafkaProducer<?, ?> ff = new KafkaProducer(props, new StringSerializer(), new StringSerializer())) {
+ fail("Constructor should throw exception");
+ } catch (ConfigException e) {
+ assertTrue("Unexpected exception message: " + e.getMessage(), e.getMessage().contains("not string key"));
+ }
+ }
+
+ @Test
public void testSerializerClose() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
@@ -197,10 +209,10 @@ public class KafkaProducerTest {
@Test
public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
- props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1");
+ Map<String, Object> configs = new HashMap();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
+ configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -212,9 +224,8 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
- final Producer<String, String> producer = new KafkaProducer<>(
- new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
- new StringSerializer(), new StringSerializer(), metadata, client, null, time);
+ final Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time);
ExecutorService executor = Executors.newSingleThreadExecutor();
final AtomicReference<Exception> closeException = new AtomicReference<>();
@@ -278,10 +289,8 @@ public class KafkaProducerTest {
@Test
public void testMetadataFetch() throws InterruptedException {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
- new StringSerializer()));
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
String topic = "topic";
Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
final Cluster emptyCluster = new Cluster(null, nodes,
@@ -299,8 +308,8 @@ public class KafkaProducerTest {
// Return empty cluster 4 times and cluster from then on
when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, cluster);
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
- metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
@Override
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
// give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -332,10 +341,8 @@ public class KafkaProducerTest {
@Test
public void testMetadataFetchOnStaleMetadata() throws Exception {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
- new StringSerializer()));
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
String topic = "topic";
ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value");
// Create a record with a partition higher than the initial (outdated) partition range
@@ -374,8 +381,8 @@ public class KafkaProducerTest {
return emptyCluster;
});
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
- metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
@Override
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
// give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -419,19 +426,17 @@ public class KafkaProducerTest {
@Test
public void testTopicRefreshInMetadata() throws InterruptedException {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
- ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
- new StringSerializer()));
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
long refreshBackoffMs = 500L;
long metadataExpireMs = 60000L;
final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true,
true, new ClusterResourceListeners());
final Time time = new MockTime();
final String topic = "topic";
- try (KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null, metadata,
- null, null, time)) {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, null, null, time)) {
Thread t = new Thread(() -> {
long startTimeMs = System.currentTimeMillis();
@@ -466,14 +471,12 @@ public class KafkaProducerTest {
}
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
Serializer<String> keySerializer = mock(serializerClassToMock);
@SuppressWarnings("unchecked")
Serializer<String> valueSerializer = mock(serializerClassToMock);
- ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, keySerializer,
- valueSerializer));
String topic = "topic";
final Cluster cluster = new Cluster(
@@ -485,8 +488,8 @@ public class KafkaProducerTest {
Metadata metadata = new Metadata(0, 90000, true);
metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
- KafkaProducer<String, String> producer = new KafkaProducer<>(config, keySerializer, valueSerializer,
- metadata, null, null, Time.SYSTEM);
+ KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
+ null, null, Time.SYSTEM);
when(keySerializer.serialize(any(), any(), any())).then(invocation ->
invocation.<String>getArgument(2).getBytes());
@@ -543,11 +546,9 @@ public class KafkaProducerTest {
@Test
public void testInterceptorPartitionSetOnTooLargeRecord() {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
- ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
- new StringSerializer()));
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
String topic = "topic";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
@@ -562,8 +563,8 @@ public class KafkaProducerTest {
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
- KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null,
- metadata, null, interceptors, Time.SYSTEM);
+ KafkaProducer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, null, interceptors, Time.SYSTEM);
when(interceptors.onSend(any())).then(invocation -> invocation.getArgument(0));
@@ -589,10 +590,10 @@ public class KafkaProducerTest {
@Test(expected = TimeoutException.class)
public void testInitTransactionTimeout() {
- Properties props = new Properties();
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = Time.SYSTEM;
Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -604,9 +605,8 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
- try (Producer<String, String> producer = new KafkaProducer<>(
- new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
- null, null, metadata, client, null, time)) {
+ try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions();
fail("initTransactions() should have raised TimeoutException");
}
@@ -614,10 +614,10 @@ public class KafkaProducerTest {
@Test(expected = KafkaException.class)
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
- Properties props = new Properties();
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -629,9 +629,8 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
- Producer<String, String> producer = new KafkaProducer<>(
- new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
- null, null, metadata, client, null, time);
+ Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+ metadata, client, null, time);
try {
producer.initTransactions();
} catch (TimeoutException e) {
@@ -647,9 +646,9 @@ public class KafkaProducerTest {
@Test
public void testSendToInvalidTopic() throws Exception {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster();
@@ -661,9 +660,8 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
- Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
- ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
- null, null, metadata, client, null, time);
+ Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+ metadata, client, null, time);
String invalidTopicName = "topic abc"; // Invalid topic name due to space
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
@@ -690,9 +688,9 @@ public class KafkaProducerTest {
@Test
public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
- Properties props = new Properties();
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
// Simulate a case where metadata for a particular topic is not available. This will cause KafkaProducer#send to
// block in Metadata#awaitUpdate for the configured max.block.ms. When close() is invoked, KafkaProducer#send should
@@ -706,9 +704,8 @@ public class KafkaProducerTest {
MockClient client = new MockClient(time, metadata);
client.setNode(node);
- Producer<String, String> producer = new KafkaProducer<>(
- new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
- new StringSerializer(), new StringSerializer(), metadata, client, null, time);
+ Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
+ metadata, client, null, time);
ExecutorService executor = Executors.newSingleThreadExecutor();
final AtomicReference<Exception> sendException = new AtomicReference<>();