You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/07 00:19:38 UTC

kafka git commit: KAFKA-724; Allow automatic socket.send.buffer from operating system in SocketServer

Repository: kafka
Updated Branches:
  refs/heads/trunk 0cee0c321 -> 430bf56cd


KAFKA-724; Allow automatic socket.send.buffer from operating system in SocketServer

If socket.receive.buffer.bytes/socket.send.buffer.bytes are set to -1, use the OS defaults.

Author: Joshi <re...@gmail.com>
Author: Rekha Joshi <re...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1469 from rekhajoshm/KAFKA-724-rebased


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/430bf56c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/430bf56c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/430bf56c

Branch: refs/heads/trunk
Commit: 430bf56cdfa32baffad21bb37dd50c080491dc03
Parents: 0cee0c3
Author: Joshi <re...@gmail.com>
Authored: Tue Jun 7 00:05:33 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 7 00:05:33 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/CommonClientConfigs.java  | 4 ++--
 core/src/main/scala/kafka/network/SocketServer.scala        | 9 ++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 4 ++--
 3 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 750b8a1..3327815 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -43,10 +43,10 @@ public class CommonClientConfigs {
     public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
 
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
-    public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
+    public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.";
 
     public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
-    public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";
+    public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.";
 
     public static final String CLIENT_ID_CONFIG = "client.id";
     public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";

http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b757abd..f00dd7b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selectable, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -304,7 +304,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         new InetSocketAddress(host, port)
     val serverChannel = ServerSocketChannel.open()
     serverChannel.configureBlocking(false)
-    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
+    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+      serverChannel.socket().setReceiveBufferSize(recvBufferSize)
+
     try {
       serverChannel.socket.bind(socketAddress)
       info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
@@ -326,7 +328,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
       socketChannel.configureBlocking(false)
       socketChannel.socket().setTcpNoDelay(true)
       socketChannel.socket().setKeepAlive(true)
-      socketChannel.socket().setSendBufferSize(sendBufferSize)
+      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+        socketChannel.socket().setSendBufferSize(sendBufferSize)
 
       debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
             .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,

http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ca66f9d..f9a12a9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -396,8 +396,8 @@ object KafkaConfig {
   val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
   " In IaaS environments, this may need to be different from the interface to which the broker binds." +
   " If this is not set, the value for `listeners` will be used."
-  val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets"
-  val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets"
+  val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
+  val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
   val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address"
   val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"