You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/09/17 23:22:53 UTC

git commit: KAFKA-1030 Addition of partitions requires bouncing all the consumers of that topic; reviewed by Neha, Swapnil, Joel

Updated Branches:
  refs/heads/0.8 aebf74619 -> c6ca97173


KAFKA-1030 Addition of partitions requires bouncing all the consumers of that topic; reviewed by Neha, Swapnil, Joel


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6ca9717
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6ca9717
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6ca9717

Branch: refs/heads/0.8
Commit: c6ca971738700643ecba78ad1f3998062481aab9
Parents: aebf746
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Sep 17 14:22:48 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Sep 17 14:22:48 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ZookeeperConsumerConnector.scala   | 14 ++------------
 1 file changed, 2 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6ca9717/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 81bf0bd..881f51e 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -31,7 +31,6 @@ import java.util.UUID
 import kafka.serializer._
 import kafka.utils.ZkUtils._
 import kafka.common._
-import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.metrics._
 import scala.Some
@@ -422,17 +421,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         true
       }
       else {
-        val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
-                                                            brokers,
-                                                            config.clientId,
-                                                            config.socketTimeoutMs,
-                                                            correlationId.getAndIncrement).topicsMetadata
-        val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-        topicsMetadata.foreach(m => {
-          val topic = m.topic
-          val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
-          partitionsPerTopicMap.put(topic, partitions)
-        })
+        val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
+        val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq))
 
         /**
          * fetchers must be stopped to avoid data duplication, since if the current