You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2013/07/23 01:18:49 UTC

[jira] [Commented] (KAFKA-984) Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same

    [ https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13715843#comment-13715843 ] 

Guozhang Wang commented on KAFKA-984:
-------------------------------------

Approach:

1. In ZookeeperConsumerConnector.reinitializeConsumer, add three additional arguments: 

partial : boolean

Seq[String] : addedTopics

Seq[String] : deletedTopics

1. In WildcardStreamsHandler.handleTopicEvent, calling reinitializeConsumer with partial = true with addedTopics and deletedTopics set.

2. In ZookeeperConsumerConnector.reinitializeConsumer, if partial == true, branch out the code starting from

    // map of {topic -> Set(thread-1, thread-2, ...)}
    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
      topicCount.getConsumerThreadIdsPerTopic

to

    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
    groupedByTopic.foreach(e => {
      val topic = e._1
      val streams = e._2.map(_._2._2).toList
      topicStreamsMap += (topic -> streams)
      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
    })

But just update topicThreadIdAndQueues and topicStreamsMap

* Note that we currently do not handle deleted topics, and this issue will not be fixed in this JIRA.

3. Add another function syncedPartialRebalance in ZKRebalancerListener, and making the reinitializeConsumer to call this function if partial == true. syncedPartialRebalance will use the same rebalanceLock as syncedRebalance.

4. ZKRebalancerListener keeps a list of topics it is currently consuming from in memory.

4. In syncedPartialRebalance, which calls rebalanceForTopic, it first checks if there are any changes of the topics by reading the ZK and comparing with its in memory list. If no new topics or deleted topics found, return directly.

4.1. For each deleted topic, simply call closeFetchers, and delete the ownership/offsets of the topic in ZK.

4.2 For each added topic, read the number of consumers of the group from ZK (we only do this once for all topics), and read the number of partitions of this topic, assign the partitions to consumers using the same deterministic algorithm.

4.3 Try writing to the ZK for all added topics. If succeed, update fetchers (start new threads) and return true, otherwise return false.

Considerata:

1. If a topic change and consumer/broker change happens at the same time, two consumers could trigger the corresponding syncedRebalance and syncedPartialRebalance at different orders. In this case we would prefer to make syncedPartialRebalance fail fast and make everyone entering the syncedRebalance phase. So one possible optimization is to check isWatcherTriggered at the beginning of syncedPartialRebalance, if it is set, return false directly. Also do not retry in syncedPartialRebalance.

2. Stopping fecthers for certain topics, i.e., only sending a partial threadIdMap to closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], relevantTopicThreadIdsMap: Map[String, Set[String]]) is not used before. Not clear if this works well.

                
> Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-984
>                 URL: https://issues.apache.org/jira/browse/KAFKA-984
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.8
>
>
> Currently a full rebalance will be triggered on high level consumers even when just a new topic is added to ZK. Better avoid this behavior but only rebalance on this newly added topic.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira