You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/20 06:40:40 UTC

kafka git commit: KAFKA-2196; Remove identical topic constraint in round-robin assignor; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 0ad646620 -> 29419581f


KAFKA-2196; Remove identical topic constraint in round-robin assignor; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29419581
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29419581
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29419581

Branch: refs/heads/trunk
Commit: 29419581f5b28b66c7de7d5eb49b3bd6b62c9aba
Parents: 0ad6466
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue May 19 21:39:00 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 19 21:39:32 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/coordinator/PartitionAssignor.scala     | 12 ++++--------
 .../unit/kafka/coordinator/PartitionAssignorTest.scala  | 11 ++++++++---
 2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29419581/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
index 1069822..8499bf8 100644
--- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
@@ -66,25 +66,21 @@ private[coordinator] object PartitionAssignor {
  * The assignment will be:
  * C0 -> [t0p0, t0p2, t1p1]
  * C1 -> [t0p1, t1p0, t1p2]
- *
- * roundrobin assignment is allowed only if the set of subscribed topics is identical for every consumer within the group.
  */
 private[coordinator] class RoundRobinAssignor extends PartitionAssignor {
   override def assign(topicsPerConsumer: Map[String, Set[String]],
                       partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
-    val consumersHaveIdenticalTopics = topicsPerConsumer.values.toSet.size == 1
-    require(consumersHaveIdenticalTopics,
-      "roundrobin assignment is allowed only if all consumers in the group subscribe to the same topics")
     val consumers = topicsPerConsumer.keys.toSeq.sorted
-    val topics = topicsPerConsumer.head._2
-    val consumerAssignor = CoreUtils.circularIterator(consumers)
+    val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted
 
-    val allTopicPartitions = topics.toSeq.flatMap { topic =>
+    val allTopicPartitions = topics.flatMap { topic =>
       val numPartitionsForTopic = partitionsPerTopic(topic)
       (0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition))
     }
 
+    var consumerAssignor = CoreUtils.circularIterator(consumers)
     val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition =>
+      consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic))
       val consumer = consumerAssignor.next()
       (consumer, topicAndPartition)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/29419581/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
index ba6d5cd..887cee5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
@@ -259,8 +259,8 @@ class PartitionAssignorTest extends JUnitSuite {
     assertEquals(expected, actual)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
-  def testRoundRobinAssignorCannotAssignWithMixedTopics() {
+  @Test
+  def testRoundRobinAssignorMultipleConsumersMixedTopics() {
     val topic1 = "topic1"
     val topic2 = "topic2"
     val consumer1 = "consumer1"
@@ -271,7 +271,12 @@ class PartitionAssignorTest extends JUnitSuite {
     val assignor = new RoundRobinAssignor()
     val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1))
     val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions)
-    assignor.assign(topicsPerConsumer, partitionsPerTopic)
+    val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic)
+    val expected = Map(
+      consumer1 -> topicAndPartitions(Map(topic1 -> Set(0))),
+      consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 1))),
+      consumer3 -> topicAndPartitions(Map(topic1 -> Set(2))))
+    assertEquals(expected, actual)
   }
 
   @Test