You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[18/28] git commit: KAFKA-714: ConsoleConsumer throws
SocketTimeoutException when fetching topic metadata; reviewed by Neha Narkhede
KAFKA-714: ConsoleConsumer throws SocketTimeoutException when fetching topic metadata; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7833c894
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7833c894
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7833c894
Branch: refs/heads/trunk
Commit: 7833c894a89779b4d49c329b16d66b3296f6c09d
Parents: 311a5d8
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 23 14:22:31 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 14:22:36 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/client/ClientUtils.scala | 3 ++-
.../scala/kafka/consumer/ConsoleConsumer.scala | 6 ++++++
.../kafka/consumer/ConsumerFetcherManager.scala | 5 ++++-
.../consumer/ZookeeperConsumerConnector.scala | 5 ++++-
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
5 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index af5d231..a3d88ea 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -58,10 +58,11 @@ object ClientUtils extends Logging{
* @param clientId The client's identifier
* @return topic metadata response
*/
- def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
val props = new Properties()
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
props.put("client.id", clientId)
+ props.put("request.timeout.ms", timeoutMs.toString)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 5dffa7e..7e84043 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -79,6 +79,11 @@ object ConsoleConsumer extends Logging {
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2 * 1024 * 1024)
+ val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(ConsumerConfig.SocketTimeout)
val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
"of time without incoming messages")
.withRequiredArg
@@ -146,6 +151,7 @@ object ConsoleConsumer extends Logging {
val props = new Properties()
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+ props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString)
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 69c6b3e..3aa7b08 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -54,7 +54,10 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try {
trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
+ brokers,
+ config.clientId,
+ config.socketTimeoutMs).topicsMetadata
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
topicsMetadata.foreach(
tmd => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c1f8513..a780a41 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -400,7 +400,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val brokers = getAllBrokersInCluster(zkClient)
- val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
+ brokers,
+ config.clientId,
+ config.socketTimeoutMs).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
topicsMetadata.foreach(m =>{
val topic = m.topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 1842c03..d8127a8 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging {
// getting topic metadata
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)