You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/09/14 18:33:13 UTC
git commit: KAFKA-1147 Consumer socket timeout should be greater than
fetch max wait; reviewed by Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk e22f8eded -> 3d3c544d6
KAFKA-1147 Consumer socket timeout should be greater than fetch max wait; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d3c544d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d3c544d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d3c544d
Branch: refs/heads/trunk
Commit: 3d3c544d6f87ddff58fc01be00410321ef0d497d
Parents: e22f8ed
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Sun Sep 14 09:32:55 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Sep 14 09:33:02 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 4 +++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d3c544d/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index efcb4af..9ebbee6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -102,8 +102,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
- /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
+ /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
+ require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
+ " to prevent unnecessary socket timeouts")
/** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d3c544d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1a45f87..dce48db 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -213,8 +213,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
- /* the socket timeout for network requests */
+ /* the socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms. */
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+ require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
+ " to prevent unnecessary socket timeouts")
/* the socket receive buffer for network requests */
val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)