You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/16 12:35:07 UTC

kafka git commit: KAFKA-3840; Allow clients default OS buffer sizes

Repository: kafka
Updated Branches:
  refs/heads/trunk eb2619cac -> 54ba2280f


KAFKA-3840; Allow clients default OS buffer sizes

Follow up on KAFKA-724 (#1469) to allow OS socket buffer sizes auto tuning for both the broker and the clients.

Author: Sebastien Launay <se...@opendns.com>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1507 from slaunay/enhancement/os-socket-buffer-size-tuning-for-clients


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54ba2280
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54ba2280
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54ba2280

Branch: refs/heads/trunk
Commit: 54ba2280f0c42b0bd3d74c197f97d8a12617a847
Parents: eb2619c
Author: Sebastien Launay <se...@opendns.com>
Authored: Thu Jun 16 14:34:58 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 16 14:34:58 2016 +0200

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  4 +--
 .../kafka/clients/producer/ProducerConfig.java  |  4 +--
 .../clients/consumer/KafkaConsumerTest.java     | 30 ++++++++++++++++++++
 .../clients/producer/KafkaProducerTest.java     | 28 ++++++++++++++++++
 4 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b7fc1d2..de10bed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -240,13 +240,13 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SEND_BUFFER_CONFIG,
                                         Type.INT,
                                         128 * 1024,
-                                        atLeast(0),
+                                        atLeast(-1),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG,
                                         Type.INT,
                                         64 * 1024,
-                                        atLeast(0),
+                                        atLeast(-1),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(FETCH_MIN_BYTES_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 47eb309..c493f67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -232,8 +232,8 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
-                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(MAX_REQUEST_SIZE_CONFIG,
                                         Type.INT,
                                         1 * 1024 * 1024,

http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 694faf2..3cbb62f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
@@ -59,7 +60,9 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -95,6 +98,33 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testOsDefaultSocketBufferSizes() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        consumer.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketSendBufferSize() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ConsumerConfig.SEND_BUFFER_CONFIG, -2);
+        new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketReceiveBufferSize() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
+        new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    @Test
     public void testSubscription() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2dada8c..461e3cf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -95,4 +96,31 @@ public class KafkaProducerTest {
             MockProducerInterceptor.resetCounters();
         }
     }
+
+    @Test
+    public void testOsDefaultSocketBufferSizes() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
+                config, new ByteArraySerializer(), new ByteArraySerializer());
+        producer.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketSendBufferSize() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2);
+        new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketReceiveBufferSize() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
+        new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+    }
 }