You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/14 18:33:13 UTC

git commit: KAFKA-1147 Consumer socket timeout should be greater than fetch max wait; reviewed by Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk e22f8eded -> 3d3c544d6


KAFKA-1147 Consumer socket timeout should be greater than fetch max wait; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d3c544d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d3c544d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d3c544d

Branch: refs/heads/trunk
Commit: 3d3c544d6f87ddff58fc01be00410321ef0d497d
Parents: e22f8ed
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Sun Sep 14 09:32:55 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Sep 14 09:33:02 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 4 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala      | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d3c544d/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index efcb4af..9ebbee6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -102,8 +102,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
    *  Set this explicitly for only testing purpose. */
   val consumerId: Option[String] = Option(props.getString("consumer.id", null))
 
-  /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
+  /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
   val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
+  require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
+    " to prevent unnecessary socket timeouts")
   
   /** the socket receive buffer for network requests */
   val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d3c544d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1a45f87..dce48db 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -213,8 +213,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
   val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
 
-  /* the socket timeout for network requests */
+  /* the socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms. */
   val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+  require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
+    " to prevent unnecessary socket timeouts")
 
   /* the socket receive buffer for network requests */
   val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)