You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/30 02:55:25 UTC

git commit: KAFKA-687; Provide a roundrobin partition assignment strategy that considers partitions from all topics; reviewed by Jun Rao and Guozhang Wang.

Repository: kafka
Updated Branches:
  refs/heads/trunk 538122fa7 -> 953e35b5c


KAFKA-687; Provide a roundrobin partition assignment strategy that considers partitions from all topics; reviewed by Jun Rao and Guozhang Wang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/953e35b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/953e35b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/953e35b5

Branch: refs/heads/trunk
Commit: 953e35b5c591559a600b4e6bd01d1dd1f2e979db
Parents: 538122f
Author: Joel Koshy <jj...@gmail.com>
Authored: Fri Aug 29 17:55:14 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 29 17:55:14 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsumerConfig.scala   |   4 +
 .../kafka/consumer/PartitionAssignor.scala      | 161 +++++++++++++++
 .../main/scala/kafka/consumer/TopicCount.scala  |  39 ++--
 .../consumer/ZookeeperConsumerConnector.scala   |  95 ++++-----
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |   2 +
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   6 +-
 .../kafka/consumer/PartitionAssignorTest.scala  | 207 +++++++++++++++++++
 7 files changed, 441 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 1cf2f62..efcb4af 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -49,6 +49,7 @@ object ConsumerConfig extends Config {
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val ExcludeInternalTopics = true
+  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
 
@@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
   val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
 
+  /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
+  val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
+  
   validate(this)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
new file mode 100644
index 0000000..8ea7368
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.common.TopicAndPartition
+import kafka.utils.{Utils, ZkUtils, Logging}
+
+trait PartitionAssignor {
+
+  /**
+   * Assigns partitions to consumer instances in a group.
+   * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong
+   *         to the given assignment-context's consumer.
+   */
+  def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId]
+
+}
+
+object PartitionAssignor {
+  def createInstance(assignmentStrategy: String) = assignmentStrategy match {
+    case "roundrobin" => new RoundRobinAssignor()
+    case _ => new RangeAssignor()
+  }
+}
+
+class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
+  val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
+    val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
+    myTopicCount.getConsumerThreadIdsPerTopic
+  }
+
+  val partitionsForTopic: collection.Map[String, Seq[Int]] =
+    ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
+
+  val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
+    ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
+
+  val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
+}
+
+/**
+ * The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It
+ * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer
+ * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
+ * will be within a delta of exactly one across all consumer threads.)
+ *
+ * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance
+ * and thread-id within that instance. Therefore, round-robin assignment is allowed only if:
+ * a) Every topic has the same number of streams within a consumer instance
+ * b) The set of subscribed topics is identical for every consumer instance within the group.
+ */
+
+class RoundRobinAssignor() extends PartitionAssignor with Logging {
+
+  def assign(ctx: AssignmentContext) = {
+    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+
+    // check conditions (a) and (b)
+    val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
+    ctx.consumersForTopic.foreach { case (topic, threadIds) =>
+      val threadIdSet = threadIds.toSet
+      require(threadIdSet == headThreadIdSet,
+              "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +
+              "AND if the stream counts across topics are identical for a given consumer instance.\n" +
+              "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) +
+              "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet))
+    }
+
+    val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
+
+    info("Starting round-robin assignment with consumers " + ctx.consumers)
+    val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
+      info("Consumer %s rebalancing the following partitions for topic %s: %s"
+           .format(ctx.consumerId, topic, partitions))
+      partitions.map(partition => {
+        TopicAndPartition(topic, partition)
+      })
+    }.toSeq.sortWith((topicPartition1, topicPartition2) => {
+      /*
+       * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
+       * up on one consumer (if it has a high enough stream count).
+       */
+      topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
+    })
+
+    allTopicPartitions.foreach(topicPartition => {
+      val threadId = threadAssignor.next()
+      if (threadId.consumer == ctx.consumerId)
+        partitionOwnershipDecision += (topicPartition -> threadId)
+    })
+
+    partitionOwnershipDecision
+  }
+}
+
+/**
+ * Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
+ * and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of
+ * consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly
+ * divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1
+ * and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread
+ * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
+ * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
+ */
+class RangeAssignor() extends PartitionAssignor with Logging {
+
+  def assign(ctx: AssignmentContext) = {
+    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+
+    for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
+      val curConsumers = ctx.consumersForTopic(topic)
+      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
+
+      val nPartsPerConsumer = curPartitions.size / curConsumers.size
+      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
+        " for topic " + topic + " with consumers: " + curConsumers)
+
+      for (consumerThreadId <- consumerThreadIdSet) {
+        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
+        assert(myConsumerPosition >= 0)
+        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+        /**
+         *   Range-partition the sorted partitions to consumers for better locality.
+         *  The first few consumers pick up an extra partition, if any.
+         */
+        if (nParts <= 0)
+          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+        else {
+          for (i <- startPart until startPart + nParts) {
+            val partition = curPartitions(i)
+            info(consumerThreadId + " attempting to claim partition " + partition)
+            // record the partition ownership decision
+            partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
+          }
+        }
+      }
+    }
+
+    partitionOwnershipDecision
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 8b0ae57..0954b3c 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -24,28 +24,37 @@ import kafka.common.KafkaException
 
 private[kafka] trait TopicCount {
 
-  def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+  def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]]
   def getTopicCountMap: Map[String, Int]
   def pattern: String
-  
-  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
-                                            topicCountMap: Map[String,  Int]) = {
-    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
+
+}
+
+case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
+  override def toString = "%s-%d".format(consumer, threadId)
+
+  def compare(that: ConsumerThreadId) = toString.compare(that.toString)
+}
+
+private[kafka] object TopicCount extends Logging {
+  val whiteListPattern = "white_list"
+  val blackListPattern = "black_list"
+  val staticPattern = "static"
+
+  def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId
+
+  def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+                                    topicCountMap: Map[String,  Int]) = {
+    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
     for ((topic, nConsumers) <- topicCountMap) {
-      val consumerSet = new mutable.HashSet[String]
+      val consumerSet = new mutable.HashSet[ConsumerThreadId]
       assert(nConsumers >= 1)
       for (i <- 0 until nConsumers)
-        consumerSet += consumerIdString + "-" + i
+        consumerSet += ConsumerThreadId(consumerIdString, i)
       consumerThreadIdsPerTopicMap.put(topic, consumerSet)
     }
     consumerThreadIdsPerTopicMap
   }
-}
-
-private[kafka] object TopicCount extends Logging {
-  val whiteListPattern = "white_list"
-  val blackListPattern = "black_list"
-  val staticPattern = "static"
 
   def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
@@ -101,7 +110,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
                                 val topicCountMap: Map[String, Int])
                                 extends TopicCount {
 
-  def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
+  def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
 
   override def equals(obj: Any): Boolean = {
     obj match {
@@ -124,7 +133,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
   def getConsumerThreadIdsPerTopic = {
     val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
                          .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
-    makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
+    TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }
 
   def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams)

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index acfd064..21f3e00 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -90,7 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
-  private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
+  private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
   private val messageStreamCreated = new AtomicBoolean(false)
 
@@ -514,9 +514,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
+
+    private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy)
+
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
+    
+    @volatile private var allTopicsOwnedPartitionsCount = 0
+    newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+      def value() = allTopicsOwnedPartitionsCount
+    })
+
+    private def ownedPartitionsCountMetricName(topic: String) =
+      "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+
     private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
       override def run() {
         info("starting watcher executor thread for consumer " + consumerIdString)
@@ -565,10 +577,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= {
       info("Releasing partition ownership")
       for ((topic, infos) <- localTopicRegistry) {
-        for(partition <- infos.keys)
+        for(partition <- infos.keys) {
           deletePartitionOwnershipFromZK(topic, partition)
+        }
+        removeMetric(ownedPartitionsCountMetricName(topic))
         localTopicRegistry.remove(topic)
       }
+      allTopicsOwnedPartitionsCount = 0
     }
 
     def resetState() {
@@ -618,7 +633,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(
         group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
-      val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics)
       val brokers = getAllBrokersInCluster(zkClient)
       if (brokers.size == 0) {
         // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
@@ -629,9 +643,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         true
       }
       else {
-        val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
-        val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted))
-
         /**
          * fetchers must be stopped to avoid data duplication, since if the current
          * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@@ -642,67 +653,41 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
         releasePartitionOwnership(topicRegistry)
 
-        var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]()
-        val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-
-        for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
-          currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
-
-          val curConsumers = consumersPerTopicMap.get(topic).get
-          val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
-
-          val nPartsPerConsumer = curPartitions.size / curConsumers.size
-          val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
-
-          info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
-            " for topic " + topic + " with consumers: " + curConsumers)
-
-          for (consumerThreadId <- consumerThreadIdSet) {
-            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
-            assert(myConsumerPosition >= 0)
-            val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
-            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
-
-            /**
-             *   Range-partition the sorted partitions to consumers for better locality.
-             *  The first few consumers pick up an extra partition, if any.
-             */
-            if (nParts <= 0)
-              warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
-            else {
-              for (i <- startPart until startPart + nParts) {
-                val partition = curPartitions(i)
-                info(consumerThreadId + " attempting to claim partition " + partition)
-                // record the partition ownership decision
-                partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
-              }
-            }
-          }
-        }
+        val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
+        val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
+        val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
+          valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
 
         // fetch current offsets for all topic-partitions
         val topicPartitions = partitionOwnershipDecision.keySet.toSeq
+
         val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
 
         if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
           false
         else {
           val offsetFetchResponse = offsetFetchResponseOpt.get
-          topicPartitions.foreach { topicAndPartition =>
+          topicPartitions.foreach(topicAndPartition => {
             val (topic, partition) = topicAndPartition.asTuple
             val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
             val threadId = partitionOwnershipDecision(topicAndPartition)
             addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
-          }
+          })
 
           /**
            * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
            * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
            */
-          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
-            info("Updating the cache")
-            debug("Partitions per topic cache " + partitionsPerTopicMap)
-            debug("Consumers per topic cache " + consumersPerTopicMap)
+          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
+            allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
+
+            partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+                                      .foreach { case (topic, partitionThreadPairs) =>
+              newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
+                def value() = partitionThreadPairs.size
+              })
+            }
+
             topicRegistry = currentTopicRegistry
             updateFetcher(cluster)
             true
@@ -753,7 +738,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
-                              relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+                              relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
       val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
@@ -776,7 +761,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = {
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
       var successfullyOwnedPartitions : List[(String, Int)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1.topic
@@ -784,7 +769,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         val consumerThreadId = partitionOwner._2
         val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
         try {
-          createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+          createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
           successfullyOwnedPartitions ::= (topic, partition)
           true
@@ -808,8 +793,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
                                       partition: Int, topic: String,
-                                      offset: Long, consumerThreadId: String) {
-      val partTopicInfoMap = currentTopicRegistry.get(topic)
+                                      offset: Long, consumerThreadId: ConsumerThreadId) {
+      val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
 
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
@@ -852,7 +837,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
 
     // map of {topic -> Set(thread-1, thread-2, ...)}
-    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+    val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
       topicCount.getConsumerThreadIdsPerTopic
 
     val allQueuesAndStreams = topicCount match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 00df462..2313a57 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -72,6 +72,8 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
     new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
     new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
     new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
+    new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
 
     // kafka.consumer.ConsumerFetcherManager
     new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index dcdc1ce..a7b1fdc 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,7 +18,7 @@
 package kafka.utils
 
 import kafka.cluster.{Broker, Cluster}
-import kafka.consumer.TopicCount
+import kafka.consumer.{ConsumerThreadId, TopicCount}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
@@ -658,10 +658,10 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = {
+  def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
     val dirs = new ZKGroupDirs(group)
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
-    val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+    val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
     for (consumer <- consumers) {
       val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics)
       for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
new file mode 100644
index 0000000..9ceae22
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.consumer
+
+import org.scalatest.junit.JUnit3Suite
+import org.easymock.EasyMock
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.data.Stat
+import kafka.consumer._
+import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
+import junit.framework.Assert._
+import kafka.common.TopicAndPartition
+import unit.kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
+import kafka.consumer.ConsumerThreadId
+import unit.kafka.consumer.PartitionAssignorTest.Scenario
+import unit.kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
+
+class PartitionAssignorTest extends JUnit3Suite with Logging {
+
+  def testRoundRobinPartitionAssignor() {
+    val assignor = new RoundRobinAssignor
+
+    /** various scenarios with only wildcard consumers */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+      val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+      val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+      }).toSeq:_*)
+      
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
+        ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
+      }).toSeq:_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true)
+    })
+  }
+
+  def testRangePartitionAssignor() {
+    val assignor = new RangeAssignor
+    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+      val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+      val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+      }).toSeq:_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        val streamCounts = Map((1 to topicCount).map(topic => {
+            val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
+            ("topic-" + topic, streamCount)
+          }).toSeq:_*)
+        ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+      }).toSeq:_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkClient)
+
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient)
+    })
+  }
+}
+
+private object PartitionAssignorTest extends Logging {
+
+  private val TestCaseCount = 3
+  private val MaxConsumerCount = 10
+  private val MaxStreamCount = 8
+  private val MaxTopicCount = 100
+  private val MinTopicCount = 20
+  private val MaxPartitionCount = 120
+  private val MinPartitionCount = 8
+
+  private trait SubscriptionInfo {
+    def registrationString: String
+  }
+
+  private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo {
+    def registrationString =
+      Json.encode(Map("version" -> 1,
+                      "subscription" -> streamCounts,
+                      "pattern" -> "static",
+                      "timestamp" -> 1234.toString))
+
+    override def toString = {
+      "Stream counts: " + streamCounts
+    }
+  }
+
+  private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean)
+          extends SubscriptionInfo {
+    def registrationString =
+      Json.encode(Map("version" -> 1,
+                      "subscription" -> Map(regex -> streamCount),
+                      "pattern" -> (if (isWhitelist) "white_list" else "black_list")))
+
+    override def toString = {
+      "\"%s\":%d (%s)".format(regex, streamCount, if (isWhitelist) "whitelist" else "blacklist")
+    }
+  }
+
+  private case class Scenario(group: String,
+                              topicPartitionCounts: Map[String, Int],
+                              /* consumerId -> SubscriptionInfo */
+                              subscriptions: Map[String, SubscriptionInfo]) {
+    override def toString = {
+      "\n" +
+      "Group                  : %s\n".format(group) +
+      "Topic partition counts : %s\n".format(topicPartitionCounts) +
+      "Consumer subscriptions : %s\n".format(subscriptions)
+    }
+  }
+
+  private def setupZkClientMock(scenario: Scenario) = {
+    val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*)
+
+    val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+    EasyMock.checkOrder(zkClient, false)
+
+    EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers)
+    EasyMock.expectLastCall().anyTimes()
+
+    scenario.subscriptions.foreach { case(consumerId, subscriptionInfo) =>
+      EasyMock.expect(zkClient.readData("/consumers/%s/ids/%s".format(scenario.group, consumerId), new Stat()))
+              .andReturn(subscriptionInfo.registrationString)
+      EasyMock.expectLastCall().anyTimes()
+    }
+
+    scenario.topicPartitionCounts.foreach { case(topic, partitionCount) =>
+      val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*)
+      EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat()))
+              .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+      EasyMock.expectLastCall().anyTimes()
+    }
+
+    EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn(
+      java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*))
+    EasyMock.expectLastCall().anyTimes()
+
+    zkClient
+  }
+
+  private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkClient: ZkClient,
+                              verifyAssignmentIsUniform: Boolean = false) {
+    val assignments = scenario.subscriptions.map{ case(consumer, subscription)  =>
+      val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
+      assignor.assign(ctx)
+    }
+
+    // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream)
+    val globalAssignment = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+    assignments.foreach(assignment => {
+      assignment.foreach { case(topicPartition, owner) =>
+        val previousOwnerOpt = globalAssignment.put(topicPartition, owner)
+        assertTrue("Scenario %s: %s is assigned to two owners.".format(scenario, topicPartition), previousOwnerOpt.isEmpty)
+      }
+    })
+
+    // check for coverage (i.e., all given partitions are owned)
+    val assignedPartitions = globalAssignment.keySet
+    val givenPartitions = scenario.topicPartitionCounts.flatMap{ case (topic, partitionCount) =>
+      (0 until partitionCount).map(partition => TopicAndPartition(topic, partition))
+    }.toSet
+    assertTrue("Scenario %s: the list of given partitions and assigned partitions are different.".format(scenario),
+      givenPartitions == assignedPartitions)
+
+    // check for uniform assignment
+    if (verifyAssignmentIsUniform) {
+      val partitionCountForStream = partitionCountPerStream(globalAssignment)
+      val maxCount = partitionCountForStream.valuesIterator.max
+      val minCount = partitionCountForStream.valuesIterator.min
+      assertTrue("Scenario %s: assignment is not uniform (partition counts per stream are in the range [%d, %d])"
+                 .format(scenario, minCount, maxCount), (maxCount - minCount) <= 1)
+    }
+  }
+
+  /** For each consumer stream, count the number of partitions that it owns. */
+  private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
+    val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
+    assignment.foreach { case (topicPartition, owner) =>
+      val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
+      ownedCounts.put(owner, updatedCount)
+    }
+    ownedCounts
+  }
+}
+