You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2013/07/01 06:33:19 UTC

[jira] [Commented] (KAFKA-956) High-level consumer fails to check topic metadata response for errors

    [ https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13696465#comment-13696465 ] 

Jun Rao commented on KAFKA-956:
-------------------------------

One possible solution is to let the consumer read the partition data from ZK directly. This way, if a consumer finds out that a topic doesn't exist, a ZK watcher is guaranteed to be triggered when the topic is created later. The only problem is that if there are many topics, reading them one at a time from ZK can be slow. ZK 3.4.x has the multi api support and we do plan to upgrade to that version. Perhaps we can revisit this issue at that point?
                
> High-level consumer fails to check topic metadata response for errors
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-956
>                 URL: https://issues.apache.org/jira/browse/KAFKA-956
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: consumer_metadata_fetch.patch
>
>
> In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:
>       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
>                                                           brokers,
>                                                           config.clientId,
>                                                           config.socketTimeoutMs,
>                                                           correlationId.getAndIncrement).topicsMetadata
>       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
>       topicsMetadata.foreach(m => {
>         val topic = m.topic
>         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
>         partitionsPerTopicMap.put(topic, partitions)
>       })
> The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira