You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[18/28] git commit: KAFKA-714: ConsoleConsumer throws SocketTimeoutException when fetching topic metadata; reviewed by Neha Narkhede

KAFKA-714: ConsoleConsumer throws SocketTimeoutException when fetching topic metadata; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7833c894
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7833c894
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7833c894

Branch: refs/heads/trunk
Commit: 7833c894a89779b4d49c329b16d66b3296f6c09d
Parents: 311a5d8
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 23 14:22:31 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 14:22:36 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/client/ClientUtils.scala |    3 ++-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    6 ++++++
 .../kafka/consumer/ConsumerFetcherManager.scala    |    5 ++++-
 .../consumer/ZookeeperConsumerConnector.scala      |    5 ++++-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    2 +-
 5 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index af5d231..a3d88ea 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -58,10 +58,11 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
     val props = new Properties()
     props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("client.id", clientId)
+    props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)
     fetchTopicMetadata(topics, brokers, producerConfig, 0)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 5dffa7e..7e84043 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -79,6 +79,11 @@ object ConsoleConsumer extends Logging {
             .describedAs("size")
             .ofType(classOf[java.lang.Integer])
             .defaultsTo(2 * 1024 * 1024)
+    val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker")
+            .withRequiredArg
+            .describedAs("ms")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(ConsumerConfig.SocketTimeout)
     val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
             "of time without incoming messages")
             .withRequiredArg
@@ -146,6 +151,7 @@ object ConsoleConsumer extends Logging {
     val props = new Properties()
     props.put("group.id", options.valueOf(groupIdOpt))
     props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString)
     props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
     props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
     props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 69c6b3e..3aa7b08 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -54,7 +54,10 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         try {
           trace("Partitions without leader %s".format(noLeaderPartitionSet))
           val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
+                                                              brokers,
+                                                              config.clientId,
+                                                              config.socketTimeoutMs).topicsMetadata
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
           topicsMetadata.foreach(
             tmd => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c1f8513..a780a41 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -400,7 +400,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
+                                                          brokers,
+                                                          config.clientId,
+                                                          config.socketTimeoutMs).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       topicsMetadata.foreach(m =>{
         val topic = m.topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/7833c894/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 1842c03..d8127a8 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging {
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
       System.exit(1)