You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/16 12:35:07 UTC
kafka git commit: KAFKA-3840; Allow clients default OS buffer sizes
Repository: kafka
Updated Branches:
refs/heads/trunk eb2619cac -> 54ba2280f
KAFKA-3840; Allow clients default OS buffer sizes
Follow up on KAFKA-724 (#1469) to allow OS socket buffer sizes auto tuning for both the broker and the clients.
Author: Sebastien Launay <se...@opendns.com>
Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1507 from slaunay/enhancement/os-socket-buffer-size-tuning-for-clients
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54ba2280
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54ba2280
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54ba2280
Branch: refs/heads/trunk
Commit: 54ba2280f0c42b0bd3d74c197f97d8a12617a847
Parents: eb2619c
Author: Sebastien Launay <se...@opendns.com>
Authored: Thu Jun 16 14:34:58 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 16 14:34:58 2016 +0200
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 4 +--
.../kafka/clients/producer/ProducerConfig.java | 4 +--
.../clients/consumer/KafkaConsumerTest.java | 30 ++++++++++++++++++++
.../clients/producer/KafkaProducerTest.java | 28 ++++++++++++++++++
4 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b7fc1d2..de10bed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -240,13 +240,13 @@ public class ConsumerConfig extends AbstractConfig {
.define(SEND_BUFFER_CONFIG,
Type.INT,
128 * 1024,
- atLeast(0),
+ atLeast(-1),
Importance.MEDIUM,
CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG,
Type.INT,
64 * 1024,
- atLeast(0),
+ atLeast(-1),
Importance.MEDIUM,
CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(FETCH_MIN_BYTES_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 47eb309..c493f67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -232,8 +232,8 @@ public class ProducerConfig extends AbstractConfig {
.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
- .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
- .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1 * 1024 * 1024,
http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 694faf2..3cbb62f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
@@ -59,7 +60,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -95,6 +98,33 @@ public class KafkaConsumerTest {
}
@Test
+ public void testOsDefaultSocketBufferSizes() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+ config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ consumer.close();
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testInvalidSocketSendBufferSize() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ConsumerConfig.SEND_BUFFER_CONFIG, -2);
+ new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testInvalidSocketReceiveBufferSize() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
+ new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ @Test
public void testSubscription() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2dada8c..461e3cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -95,4 +96,31 @@ public class KafkaProducerTest {
MockProducerInterceptor.resetCounters();
}
}
+
+ @Test
+ public void testOsDefaultSocketBufferSizes() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
+ config, new ByteArraySerializer(), new ByteArraySerializer());
+ producer.close();
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testInvalidSocketSendBufferSize() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2);
+ new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testInvalidSocketReceiveBufferSize() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
+ new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+ }
}