You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/20 06:40:40 UTC
kafka git commit: KAFKA-2196;
Remove identical topic constraint in round-robin assignor;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 0ad646620 -> 29419581f
KAFKA-2196; Remove identical topic constraint in round-robin assignor; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29419581
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29419581
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29419581
Branch: refs/heads/trunk
Commit: 29419581f5b28b66c7de7d5eb49b3bd6b62c9aba
Parents: 0ad6466
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue May 19 21:39:00 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 19 21:39:32 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/coordinator/PartitionAssignor.scala | 12 ++++--------
.../unit/kafka/coordinator/PartitionAssignorTest.scala | 11 ++++++++---
2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/29419581/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
index 1069822..8499bf8 100644
--- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
@@ -66,25 +66,21 @@ private[coordinator] object PartitionAssignor {
* The assignment will be:
* C0 -> [t0p0, t0p2, t1p1]
* C1 -> [t0p1, t1p0, t1p2]
- *
- * roundrobin assignment is allowed only if the set of subscribed topics is identical for every consumer within the group.
*/
private[coordinator] class RoundRobinAssignor extends PartitionAssignor {
override def assign(topicsPerConsumer: Map[String, Set[String]],
partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
- val consumersHaveIdenticalTopics = topicsPerConsumer.values.toSet.size == 1
- require(consumersHaveIdenticalTopics,
- "roundrobin assignment is allowed only if all consumers in the group subscribe to the same topics")
val consumers = topicsPerConsumer.keys.toSeq.sorted
- val topics = topicsPerConsumer.head._2
- val consumerAssignor = CoreUtils.circularIterator(consumers)
+ val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted
- val allTopicPartitions = topics.toSeq.flatMap { topic =>
+ val allTopicPartitions = topics.flatMap { topic =>
val numPartitionsForTopic = partitionsPerTopic(topic)
(0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition))
}
+ var consumerAssignor = CoreUtils.circularIterator(consumers)
val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition =>
+ consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic))
val consumer = consumerAssignor.next()
(consumer, topicAndPartition)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/29419581/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
index ba6d5cd..887cee5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
@@ -259,8 +259,8 @@ class PartitionAssignorTest extends JUnitSuite {
assertEquals(expected, actual)
}
- @Test(expected = classOf[IllegalArgumentException])
- def testRoundRobinAssignorCannotAssignWithMixedTopics() {
+ @Test
+ def testRoundRobinAssignorMultipleConsumersMixedTopics() {
val topic1 = "topic1"
val topic2 = "topic2"
val consumer1 = "consumer1"
@@ -271,7 +271,12 @@ class PartitionAssignorTest extends JUnitSuite {
val assignor = new RoundRobinAssignor()
val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1))
val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions)
- assignor.assign(topicsPerConsumer, partitionsPerTopic)
+ val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic)
+ val expected = Map(
+ consumer1 -> topicAndPartitions(Map(topic1 -> Set(0))),
+ consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 1))),
+ consumer3 -> topicAndPartitions(Map(topic1 -> Set(2))))
+ assertEquals(expected, actual)
}
@Test