You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/30 02:55:25 UTC
git commit: KAFKA-687;
Provide a roundrobin partition assignment strategy that considers
partitions from all topics; reviewed by Jun Rao and Guozhang Wang.
Repository: kafka
Updated Branches:
refs/heads/trunk 538122fa7 -> 953e35b5c
KAFKA-687; Provide a roundrobin partition assignment strategy that considers partitions from all topics; reviewed by Jun Rao and Guozhang Wang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/953e35b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/953e35b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/953e35b5
Branch: refs/heads/trunk
Commit: 953e35b5c591559a600b4e6bd01d1dd1f2e979db
Parents: 538122f
Author: Joel Koshy <jj...@gmail.com>
Authored: Fri Aug 29 17:55:14 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 29 17:55:14 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsumerConfig.scala | 4 +
.../kafka/consumer/PartitionAssignor.scala | 161 +++++++++++++++
.../main/scala/kafka/consumer/TopicCount.scala | 39 ++--
.../consumer/ZookeeperConsumerConnector.scala | 95 ++++-----
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 2 +
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +-
.../kafka/consumer/PartitionAssignorTest.scala | 207 +++++++++++++++++++
7 files changed, 441 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 1cf2f62..efcb4af 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -49,6 +49,7 @@ object ConsumerConfig extends Config {
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
+ val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
@@ -175,6 +176,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
+ /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
+ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
+
validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
new file mode 100644
index 0000000..8ea7368
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.common.TopicAndPartition
+import kafka.utils.{Utils, ZkUtils, Logging}
+
+trait PartitionAssignor {
+
+ /**
+ * Assigns partitions to consumer instances in a group.
+ * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong
+ * to the given assignment-context's consumer.
+ */
+ def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId]
+
+}
+
+object PartitionAssignor {
+ def createInstance(assignmentStrategy: String) = assignmentStrategy match {
+ case "roundrobin" => new RoundRobinAssignor()
+ case _ => new RangeAssignor()
+ }
+}
+
+class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
+ val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
+ val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
+ myTopicCount.getConsumerThreadIdsPerTopic
+ }
+
+ val partitionsForTopic: collection.Map[String, Seq[Int]] =
+ ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
+
+ val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
+ ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
+
+ val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
+}
+
+/**
+ * The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It
+ * then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer
+ * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
+ * will be within a delta of exactly one across all consumer threads.)
+ *
+ * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance
+ * and thread-id within that instance. Therefore, round-robin assignment is allowed only if:
+ * a) Every topic has the same number of streams within a consumer instance
+ * b) The set of subscribed topics is identical for every consumer instance within the group.
+ */
+
+class RoundRobinAssignor() extends PartitionAssignor with Logging {
+
+ def assign(ctx: AssignmentContext) = {
+ val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+
+ // check conditions (a) and (b)
+ val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
+ ctx.consumersForTopic.foreach { case (topic, threadIds) =>
+ val threadIdSet = threadIds.toSet
+ require(threadIdSet == headThreadIdSet,
+ "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +
+ "AND if the stream counts across topics are identical for a given consumer instance.\n" +
+ "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) +
+ "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet))
+ }
+
+ val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
+
+ info("Starting round-robin assignment with consumers " + ctx.consumers)
+ val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
+ info("Consumer %s rebalancing the following partitions for topic %s: %s"
+ .format(ctx.consumerId, topic, partitions))
+ partitions.map(partition => {
+ TopicAndPartition(topic, partition)
+ })
+ }.toSeq.sortWith((topicPartition1, topicPartition2) => {
+ /*
+ * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
+ * up on one consumer (if it has a high enough stream count).
+ */
+ topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
+ })
+
+ allTopicPartitions.foreach(topicPartition => {
+ val threadId = threadAssignor.next()
+ if (threadId.consumer == ctx.consumerId)
+ partitionOwnershipDecision += (topicPartition -> threadId)
+ })
+
+ partitionOwnershipDecision
+ }
+}
+
+/**
+ * Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
+ * and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of
+ * consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly
+ * divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1
+ * and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread
+ * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
+ * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
+ */
+class RangeAssignor() extends PartitionAssignor with Logging {
+
+ def assign(ctx: AssignmentContext) = {
+ val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+
+ for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
+ val curConsumers = ctx.consumersForTopic(topic)
+ val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
+
+ val nPartsPerConsumer = curPartitions.size / curConsumers.size
+ val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+ info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
+ " for topic " + topic + " with consumers: " + curConsumers)
+
+ for (consumerThreadId <- consumerThreadIdSet) {
+ val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
+ assert(myConsumerPosition >= 0)
+ val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
+ val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
+
+ /**
+ * Range-partition the sorted partitions to consumers for better locality.
+ * The first few consumers pick up an extra partition, if any.
+ */
+ if (nParts <= 0)
+ warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+ else {
+ for (i <- startPart until startPart + nParts) {
+ val partition = curPartitions(i)
+ info(consumerThreadId + " attempting to claim partition " + partition)
+ // record the partition ownership decision
+ partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
+ }
+ }
+ }
+ }
+
+ partitionOwnershipDecision
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 8b0ae57..0954b3c 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -24,28 +24,37 @@ import kafka.common.KafkaException
private[kafka] trait TopicCount {
- def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
+ def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]]
def getTopicCountMap: Map[String, Int]
def pattern: String
-
- protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
- topicCountMap: Map[String, Int]) = {
- val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
+
+}
+
+case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
+ override def toString = "%s-%d".format(consumer, threadId)
+
+ def compare(that: ConsumerThreadId) = toString.compare(that.toString)
+}
+
+private[kafka] object TopicCount extends Logging {
+ val whiteListPattern = "white_list"
+ val blackListPattern = "black_list"
+ val staticPattern = "static"
+
+ def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId
+
+ def makeConsumerThreadIdsPerTopic(consumerIdString: String,
+ topicCountMap: Map[String, Int]) = {
+ val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
for ((topic, nConsumers) <- topicCountMap) {
- val consumerSet = new mutable.HashSet[String]
+ val consumerSet = new mutable.HashSet[ConsumerThreadId]
assert(nConsumers >= 1)
for (i <- 0 until nConsumers)
- consumerSet += consumerIdString + "-" + i
+ consumerSet += ConsumerThreadId(consumerIdString, i)
consumerThreadIdsPerTopicMap.put(topic, consumerSet)
}
consumerThreadIdsPerTopicMap
}
-}
-
-private[kafka] object TopicCount extends Logging {
- val whiteListPattern = "white_list"
- val blackListPattern = "black_list"
- val staticPattern = "static"
def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
val dirs = new ZKGroupDirs(group)
@@ -101,7 +110,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
val topicCountMap: Map[String, Int])
extends TopicCount {
- def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
+ def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
override def equals(obj: Any): Boolean = {
obj match {
@@ -124,7 +133,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
def getConsumerThreadIdsPerTopic = {
val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
.filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
- makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
+ TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}
def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams)
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index acfd064..21f3e00 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -90,7 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
- private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
+ private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
@@ -514,9 +514,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
+
+ private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy)
+
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
+
+ @volatile private var allTopicsOwnedPartitionsCount = 0
+ newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
+ def value() = allTopicsOwnedPartitionsCount
+ })
+
+ private def ownedPartitionsCountMetricName(topic: String) =
+ "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
info("starting watcher executor thread for consumer " + consumerIdString)
@@ -565,10 +577,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= {
info("Releasing partition ownership")
for ((topic, infos) <- localTopicRegistry) {
- for(partition <- infos.keys)
+ for(partition <- infos.keys) {
deletePartitionOwnershipFromZK(topic, partition)
+ }
+ removeMetric(ownedPartitionsCountMetricName(topic))
localTopicRegistry.remove(topic)
}
+ allTopicsOwnedPartitionsCount = 0
}
def resetState() {
@@ -618,7 +633,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(
group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
- val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics)
val brokers = getAllBrokersInCluster(zkClient)
if (brokers.size == 0) {
// This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
@@ -629,9 +643,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
true
}
else {
- val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
- val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq.sorted))
-
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@@ -642,67 +653,41 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
releasePartitionOwnership(topicRegistry)
- var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]()
- val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-
- for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
- currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
-
- val curConsumers = consumersPerTopicMap.get(topic).get
- val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
-
- val nPartsPerConsumer = curPartitions.size / curConsumers.size
- val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
-
- info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
- " for topic " + topic + " with consumers: " + curConsumers)
-
- for (consumerThreadId <- consumerThreadIdSet) {
- val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
- assert(myConsumerPosition >= 0)
- val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
- val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
-
- /**
- * Range-partition the sorted partitions to consumers for better locality.
- * The first few consumers pick up an extra partition, if any.
- */
- if (nParts <= 0)
- warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
- else {
- for (i <- startPart until startPart + nParts) {
- val partition = curPartitions(i)
- info(consumerThreadId + " attempting to claim partition " + partition)
- // record the partition ownership decision
- partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
- }
- }
- }
- }
+ val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
+ val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
+ val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
+ valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
// fetch current offsets for all topic-partitions
val topicPartitions = partitionOwnershipDecision.keySet.toSeq
+
val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
false
else {
val offsetFetchResponse = offsetFetchResponseOpt.get
- topicPartitions.foreach { topicAndPartition =>
+ topicPartitions.foreach(topicAndPartition => {
val (topic, partition) = topicAndPartition.asTuple
val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
val threadId = partitionOwnershipDecision(topicAndPartition)
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
- }
+ })
/**
* move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/
- if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
- info("Updating the cache")
- debug("Partitions per topic cache " + partitionsPerTopicMap)
- debug("Consumers per topic cache " + consumersPerTopicMap)
+ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
+ allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
+
+ partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+ .foreach { case (topic, partitionThreadPairs) =>
+ newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
+ def value() = partitionThreadPairs.size
+ })
+ }
+
topicRegistry = currentTopicRegistry
updateFetcher(cluster)
true
@@ -753,7 +738,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
- relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+ relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
@@ -776,7 +761,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
- private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = {
+ private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
var successfullyOwnedPartitions : List[(String, Int)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1.topic
@@ -784,7 +769,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val consumerThreadId = partitionOwner._2
val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
try {
- createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+ createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
successfullyOwnedPartitions ::= (topic, partition)
true
@@ -808,8 +793,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
partition: Int, topic: String,
- offset: Long, consumerThreadId: String) {
- val partTopicInfoMap = currentTopicRegistry.get(topic)
+ offset: Long, consumerThreadId: ConsumerThreadId) {
+ val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
@@ -852,7 +837,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
// map of {topic -> Set(thread-1, thread-2, ...)}
- val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+ val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
topicCount.getConsumerThreadIdsPerTopic
val allQueuesAndStreams = topicCount match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 00df462..2313a57 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -72,6 +72,8 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
// kafka.consumer.ConsumerFetcherManager
new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index dcdc1ce..a7b1fdc 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,7 +18,7 @@
package kafka.utils
import kafka.cluster.{Broker, Cluster}
-import kafka.consumer.TopicCount
+import kafka.consumer.{ConsumerThreadId, TopicCount}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException}
@@ -658,10 +658,10 @@ object ZkUtils extends Logging {
getChildren(zkClient, dirs.consumerRegistryDir)
}
- def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[String]] = {
+ def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
val dirs = new ZKGroupDirs(group)
val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
- val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+ val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
for (consumer <- consumers) {
val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics)
for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/953e35b5/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
new file mode 100644
index 0000000..9ceae22
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.consumer
+
+import org.scalatest.junit.JUnit3Suite
+import org.easymock.EasyMock
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.data.Stat
+import kafka.consumer._
+import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
+import junit.framework.Assert._
+import kafka.common.TopicAndPartition
+import unit.kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
+import kafka.consumer.ConsumerThreadId
+import unit.kafka.consumer.PartitionAssignorTest.Scenario
+import unit.kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
+
+class PartitionAssignorTest extends JUnit3Suite with Logging {
+
+ def testRoundRobinPartitionAssignor() {
+ val assignor = new RoundRobinAssignor
+
+ /** various scenarios with only wildcard consumers */
+ (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+ val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
+
+ val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+ ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+ }).toSeq:_*)
+
+ val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
+ ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
+ }).toSeq:_*)
+ val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+ val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
+ EasyMock.replay(zkClient)
+ PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient, verifyAssignmentIsUniform = true)
+ })
+ }
+
+ def testRangePartitionAssignor() {
+ val assignor = new RangeAssignor
+ (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
+ val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
+
+ val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+ ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+ }).toSeq:_*)
+
+ val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val streamCounts = Map((1 to topicCount).map(topic => {
+ val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
+ ("topic-" + topic, streamCount)
+ }).toSeq:_*)
+ ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+ }).toSeq:_*)
+ val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+ val zkClient = PartitionAssignorTest.setupZkClientMock(scenario)
+ EasyMock.replay(zkClient)
+
+ PartitionAssignorTest.assignAndVerify(scenario, assignor, zkClient)
+ })
+ }
+}
+
+private object PartitionAssignorTest extends Logging {
+
+ private val TestCaseCount = 3
+ private val MaxConsumerCount = 10
+ private val MaxStreamCount = 8
+ private val MaxTopicCount = 100
+ private val MinTopicCount = 20
+ private val MaxPartitionCount = 120
+ private val MinPartitionCount = 8
+
+ private trait SubscriptionInfo {
+ def registrationString: String
+ }
+
+ private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo {
+ def registrationString =
+ Json.encode(Map("version" -> 1,
+ "subscription" -> streamCounts,
+ "pattern" -> "static",
+ "timestamp" -> 1234.toString))
+
+ override def toString = {
+ "Stream counts: " + streamCounts
+ }
+ }
+
+ private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean)
+ extends SubscriptionInfo {
+ def registrationString =
+ Json.encode(Map("version" -> 1,
+ "subscription" -> Map(regex -> streamCount),
+ "pattern" -> (if (isWhitelist) "white_list" else "black_list")))
+
+ override def toString = {
+ "\"%s\":%d (%s)".format(regex, streamCount, if (isWhitelist) "whitelist" else "blacklist")
+ }
+ }
+
+ private case class Scenario(group: String,
+ topicPartitionCounts: Map[String, Int],
+ /* consumerId -> SubscriptionInfo */
+ subscriptions: Map[String, SubscriptionInfo]) {
+ override def toString = {
+ "\n" +
+ "Group : %s\n".format(group) +
+ "Topic partition counts : %s\n".format(topicPartitionCounts) +
+ "Consumer subscriptions : %s\n".format(subscriptions)
+ }
+ }
+
+ private def setupZkClientMock(scenario: Scenario) = {
+ val consumers = java.util.Arrays.asList(scenario.subscriptions.keys.toSeq:_*)
+
+ val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
+ EasyMock.checkOrder(zkClient, false)
+
+ EasyMock.expect(zkClient.getChildren("/consumers/%s/ids".format(scenario.group))).andReturn(consumers)
+ EasyMock.expectLastCall().anyTimes()
+
+ scenario.subscriptions.foreach { case(consumerId, subscriptionInfo) =>
+ EasyMock.expect(zkClient.readData("/consumers/%s/ids/%s".format(scenario.group, consumerId), new Stat()))
+ .andReturn(subscriptionInfo.registrationString)
+ EasyMock.expectLastCall().anyTimes()
+ }
+
+ scenario.topicPartitionCounts.foreach { case(topic, partitionCount) =>
+ val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*)
+ EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat()))
+ .andReturn(ZkUtils.replicaAssignmentZkData(replicaAssignment))
+ EasyMock.expectLastCall().anyTimes()
+ }
+
+ EasyMock.expect(zkClient.getChildren("/brokers/topics")).andReturn(
+ java.util.Arrays.asList(scenario.topicPartitionCounts.keys.toSeq:_*))
+ EasyMock.expectLastCall().anyTimes()
+
+ zkClient
+ }
+
+ private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkClient: ZkClient,
+ verifyAssignmentIsUniform: Boolean = false) {
+ val assignments = scenario.subscriptions.map{ case(consumer, subscription) =>
+ val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
+ assignor.assign(ctx)
+ }
+
+ // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream)
+ val globalAssignment = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+ assignments.foreach(assignment => {
+ assignment.foreach { case(topicPartition, owner) =>
+ val previousOwnerOpt = globalAssignment.put(topicPartition, owner)
+ assertTrue("Scenario %s: %s is assigned to two owners.".format(scenario, topicPartition), previousOwnerOpt.isEmpty)
+ }
+ })
+
+ // check for coverage (i.e., all given partitions are owned)
+ val assignedPartitions = globalAssignment.keySet
+ val givenPartitions = scenario.topicPartitionCounts.flatMap{ case (topic, partitionCount) =>
+ (0 until partitionCount).map(partition => TopicAndPartition(topic, partition))
+ }.toSet
+ assertTrue("Scenario %s: the list of given partitions and assigned partitions are different.".format(scenario),
+ givenPartitions == assignedPartitions)
+
+ // check for uniform assignment
+ if (verifyAssignmentIsUniform) {
+ val partitionCountForStream = partitionCountPerStream(globalAssignment)
+ val maxCount = partitionCountForStream.valuesIterator.max
+ val minCount = partitionCountForStream.valuesIterator.min
+ assertTrue("Scenario %s: assignment is not uniform (partition counts per stream are in the range [%d, %d])"
+ .format(scenario, minCount, maxCount), (maxCount - minCount) <= 1)
+ }
+ }
+
+ /** For each consumer stream, count the number of partitions that it owns. */
+ private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
+ val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
+ assignment.foreach { case (topicPartition, owner) =>
+ val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
+ ownedCounts.put(owner, updatedCount)
+ }
+ ownedCounts
+ }
+}
+