You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 19:05:50 UTC

kafka git commit: KAFKA-2737: Added single- and multi-consumer integration tests for round-robin assignment

Repository: kafka
Updated Branches:
  refs/heads/trunk 421de0a3f -> aa73554c1


KAFKA-2737: Added single- and multi-consumer integration tests for round-robin assignment

Two tests:
1. One consumer subscribes to 2 topics, each with 2 partitions; includes adding and removing a topic.
2. Several consumers subscribe to 2 topics, several partition each; includes adding one more consumer after initial assignment is done and verified.

Author: Anna Povzner <an...@confluent.io>

Reviewers: Guozhang Wang

Closes #413 from apovzner/cpkafka-76


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa73554c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa73554c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa73554c

Branch: refs/heads/trunk
Commit: aa73554c19b15a1589a77a4fae85c2d66a649acf
Parents: 421de0a
Author: Anna Povzner <an...@confluent.io>
Authored: Wed Nov 4 10:11:31 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 10:11:31 2015 -0800

----------------------------------------------------------------------
 .../kafka/api/BaseConsumerTest.scala            |  54 ++++++++-
 .../kafka/api/PlaintextConsumerTest.scala       | 117 ++++++++++++++++++-
 2 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 9487c77..2e674af 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,13 +19,14 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
-import kafka.utils.{TestUtils, Logging}
+import kafka.utils.{TestUtils, Logging, ShutdownableThread}
 import kafka.server.KafkaConfig
 
 import java.util.ArrayList
 import org.junit.Assert._
 import org.junit.{Test, Before}
 
+import scala.collection.mutable.Buffer
 import scala.collection.JavaConverters._
 import kafka.coordinator.GroupCoordinator
 
@@ -315,4 +316,55 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
   }
 
+  protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]]) extends ShutdownableThread("daemon-consumer-assignment", false)
+  {
+    @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
+
+    def consumerAssignment(): Set[TopicPartition] = {
+      partitionAssignment
+    }
+
+    override def doWork(): Unit = {
+      consumer.poll(50)
+      if (consumer.assignment() != partitionAssignment.asJava) {
+        partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
+      }
+      Thread.sleep(100L)
+    }
+  }
+
+  /**
+   * Check whether partition assignment is valid
+   * Assumes partition assignment is valid iff
+   * 1. Every consumer got assigned at least one partition
+   * 2. Each partition is assigned to only one consumer
+   * 3. Every partition is assigned to one of the consumers
+   *
+   * @param assignments set of consumer assignments; one per each consumer
+   * @param partitions set of partitions that consumers subscribed to
+   * @return true if partition assignment is valid
+   */
+  def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
+                                 partitions: Set[TopicPartition]): Boolean = {
+    val allNonEmptyAssignments = assignments forall (assignment => assignment.size > 0)
+    if (!allNonEmptyAssignments) {
+      // at least one consumer got empty assignment
+      return false
+    }
+
+    // make sure that sum of all partitions to all consumers equals total number of partitions
+    val totalPartitionsInAssignments = (0 /: assignments) (_ + _.size)
+    if (totalPartitionsInAssignments != partitions.size) {
+      // either same partitions got assigned to more than one consumer or some
+      // partitions were not assigned
+      return false
+    }
+
+    // The above checks could miss the case where one or more partitions were assigned to more
+    // than one consumer and the same number of partitions were missing from assignments.
+    // Make sure that all unique assignments are the same as 'partitions'
+    val uniqueAssignedPartitions = (Set[TopicPartition]() /: assignments) (_ ++ _)
+    uniqueAssignedPartitions == partitions
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6c7a653..2e7471c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,13 +15,14 @@ package kafka.api
 import java.util.regex.Pattern
 
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig, RoundRobinAssignor}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
 import org.junit.Assert._
 import org.junit.Test
+import scala.collection.mutable.Buffer
 import scala.collection.JavaConverters
 import JavaConverters._
 
@@ -366,4 +367,118 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer0.close()
   }
+
+  @Test
+  def testRoundRobinAssignment() {
+    // 1 consumer using round-robin assignment
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    // create two new topics, each having 2 partitions
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+    val expectedAssignment = createTopicAndSendRecords(topic1, 2, 100) ++ createTopicAndSendRecords(topic2, 2, 100)
+
+    assertEquals(0, consumer0.assignment().size)
+
+    // subscribe to two topics
+    consumer0.subscribe(List(topic1, topic2).asJava)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == expectedAssignment.asJava
+    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // add one more topic with 2 partitions
+    val topic3 = "topic3"
+    createTopicAndSendRecords(topic3, 2, 100)
+
+    val newExpectedAssignment = expectedAssignment ++ Set(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1))
+    consumer0.subscribe(List(topic1, topic2, topic3).asJava)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == newExpectedAssignment.asJava
+    }, s"Expected partitions ${newExpectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // remove the topic we just added
+    consumer0.subscribe(List(topic1, topic2).asJava)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == expectedAssignment.asJava
+    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+    consumer0.unsubscribe()
+    assertEquals(0, consumer0.assignment().size)
+  }
+
+  @Test
+  def testMultiConsumerRoundRobinAssignment() {
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
+
+    val consumerCount = 10
+    val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+    for (i <- 0 until consumerCount) {
+      rrConsumers += new KafkaConsumer(this.consumerConfig)
+    }
+
+    // create two new topics, total number of partitions must be greater than number of consumers
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+    val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2, 8, 100)
+
+    // all consumers subscribe to all the topics and start polling
+    // for the topic partition assignment
+    val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+    for (consumer <- rrConsumers) {
+      assertEquals(0, consumer.assignment().size)
+      consumer.subscribe(List(topic1, topic2).asJava)
+      val poller = new ConsumerAssignmentPoller(consumer)
+      consumerPollers += poller
+      poller.start()
+    }
+
+    TestUtils.waitUntilTrue(() => {
+      val assignments = Buffer[Set[TopicPartition]]()
+      consumerPollers.foreach(assignments += _.consumerAssignment())
+      isPartitionAssignmentValid(assignments, subscriptions)
+    }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+
+    // add one more consumer
+    val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+    newConsumer.subscribe(List(topic1, topic2).asJava)
+    val newPoller = new ConsumerAssignmentPoller(newConsumer)
+    rrConsumers += newConsumer
+    consumerPollers += newPoller
+    newPoller.start()
+
+    // wait until topics get re-assigned
+    TestUtils.waitUntilTrue(() => {
+      val assignments = Buffer[Set[TopicPartition]]()
+      consumerPollers.foreach(assignments += _.consumerAssignment())
+      isPartitionAssignmentValid(assignments, subscriptions)
+    }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added one more consumer")
+
+    for (poller <- consumerPollers)
+      poller.shutdown()
+
+    for (consumer <- rrConsumers) {
+      consumer.unsubscribe()
+    }
+  }
+
+  /**
+   * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
+   * records to each partition
+   */
+  def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = {
+    TestUtils.createTopic(this.zkUtils, topicName, numPartitions, serverCount, this.servers)
+    var parts = Set[TopicPartition]()
+    for (partition <- 0 until numPartitions) {
+      val tp = new TopicPartition(topicName, partition)
+      sendRecords(recordsPerPartition, tp)
+      parts = parts + tp
+    }
+    parts
+  }
 }