You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 19:05:50 UTC
kafka git commit: KAFKA-2737: Added single- and multi-consumer
integration tests for round-robin assignment
Repository: kafka
Updated Branches:
refs/heads/trunk 421de0a3f -> aa73554c1
KAFKA-2737: Added single- and multi-consumer integration tests for round-robin assignment
Two tests:
1. One consumer subscribes to 2 topics, each with 2 partitions; includes adding and removing a topic.
2. Several consumers subscribe to 2 topics, several partition each; includes adding one more consumer after initial assignment is done and verified.
Author: Anna Povzner <an...@confluent.io>
Reviewers: Guozhang Wang
Closes #413 from apovzner/cpkafka-76
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa73554c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa73554c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa73554c
Branch: refs/heads/trunk
Commit: aa73554c19b15a1589a77a4fae85c2d66a649acf
Parents: 421de0a
Author: Anna Povzner <an...@confluent.io>
Authored: Wed Nov 4 10:11:31 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 10:11:31 2015 -0800
----------------------------------------------------------------------
.../kafka/api/BaseConsumerTest.scala | 54 ++++++++-
.../kafka/api/PlaintextConsumerTest.scala | 117 ++++++++++++++++++-
2 files changed, 169 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 9487c77..2e674af 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,13 +19,14 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{TestUtils, Logging}
+import kafka.utils.{TestUtils, Logging, ShutdownableThread}
import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
import org.junit.{Test, Before}
+import scala.collection.mutable.Buffer
import scala.collection.JavaConverters._
import kafka.coordinator.GroupCoordinator
@@ -315,4 +316,55 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
}
+ protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]]) extends ShutdownableThread("daemon-consumer-assignment", false)
+ {
+ @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
+
+ def consumerAssignment(): Set[TopicPartition] = {
+ partitionAssignment
+ }
+
+ override def doWork(): Unit = {
+ consumer.poll(50)
+ if (consumer.assignment() != partitionAssignment.asJava) {
+ partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
+ }
+ Thread.sleep(100L)
+ }
+ }
+
+ /**
+ * Check whether partition assignment is valid
+ * Assumes partition assignment is valid iff
+ * 1. Every consumer got assigned at least one partition
+ * 2. Each partition is assigned to only one consumer
+ * 3. Every partition is assigned to one of the consumers
+ *
+ * @param assignments set of consumer assignments; one per each consumer
+ * @param partitions set of partitions that consumers subscribed to
+ * @return true if partition assignment is valid
+ */
+ def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
+ partitions: Set[TopicPartition]): Boolean = {
+ val allNonEmptyAssignments = assignments forall (assignment => assignment.size > 0)
+ if (!allNonEmptyAssignments) {
+ // at least one consumer got empty assignment
+ return false
+ }
+
+ // make sure that sum of all partitions to all consumers equals total number of partitions
+ val totalPartitionsInAssignments = (0 /: assignments) (_ + _.size)
+ if (totalPartitionsInAssignments != partitions.size) {
+ // either same partitions got assigned to more than one consumer or some
+ // partitions were not assigned
+ return false
+ }
+
+ // The above checks could miss the case where one or more partitions were assigned to more
+ // than one consumer and the same number of partitions were missing from assignments.
+ // Make sure that all unique assignments are the same as 'partitions'
+ val uniqueAssignedPartitions = (Set[TopicPartition]() /: assignments) (_ ++ _)
+ uniqueAssignedPartitions == partitions
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6c7a653..2e7471c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,13 +15,14 @@ package kafka.api
import java.util.regex.Pattern
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig, RoundRobinAssignor}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
import org.junit.Assert._
import org.junit.Test
+import scala.collection.mutable.Buffer
import scala.collection.JavaConverters
import JavaConverters._
@@ -366,4 +367,118 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer0.close()
}
+
+ @Test
+ def testRoundRobinAssignment() {
+ // 1 consumer using round-robin assignment
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
+ this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+ // create two new topics, each having 2 partitions
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+ val expectedAssignment = createTopicAndSendRecords(topic1, 2, 100) ++ createTopicAndSendRecords(topic2, 2, 100)
+
+ assertEquals(0, consumer0.assignment().size)
+
+ // subscribe to two topics
+ consumer0.subscribe(List(topic1, topic2).asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer0.poll(50)
+ consumer0.assignment() == expectedAssignment.asJava
+ }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+ // add one more topic with 2 partitions
+ val topic3 = "topic3"
+ createTopicAndSendRecords(topic3, 2, 100)
+
+ val newExpectedAssignment = expectedAssignment ++ Set(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1))
+ consumer0.subscribe(List(topic1, topic2, topic3).asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer0.poll(50)
+ consumer0.assignment() == newExpectedAssignment.asJava
+ }, s"Expected partitions ${newExpectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+ // remove the topic we just added
+ consumer0.subscribe(List(topic1, topic2).asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer0.poll(50)
+ consumer0.assignment() == expectedAssignment.asJava
+ }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+ consumer0.unsubscribe()
+ assertEquals(0, consumer0.assignment().size)
+ }
+
+ @Test
+ def testMultiConsumerRoundRobinAssignment() {
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
+ this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
+
+ val consumerCount = 10
+ val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ for (i <- 0 until consumerCount) {
+ rrConsumers += new KafkaConsumer(this.consumerConfig)
+ }
+
+ // create two new topics, total number of partitions must be greater than number of consumers
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+ val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2, 8, 100)
+
+ // all consumers subscribe to all the topics and start polling
+ // for the topic partition assignment
+ val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+ for (consumer <- rrConsumers) {
+ assertEquals(0, consumer.assignment().size)
+ consumer.subscribe(List(topic1, topic2).asJava)
+ val poller = new ConsumerAssignmentPoller(consumer)
+ consumerPollers += poller
+ poller.start()
+ }
+
+ TestUtils.waitUntilTrue(() => {
+ val assignments = Buffer[Set[TopicPartition]]()
+ consumerPollers.foreach(assignments += _.consumerAssignment())
+ isPartitionAssignmentValid(assignments, subscriptions)
+ }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+
+ // add one more consumer
+ val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+ newConsumer.subscribe(List(topic1, topic2).asJava)
+ val newPoller = new ConsumerAssignmentPoller(newConsumer)
+ rrConsumers += newConsumer
+ consumerPollers += newPoller
+ newPoller.start()
+
+ // wait until topics get re-assigned
+ TestUtils.waitUntilTrue(() => {
+ val assignments = Buffer[Set[TopicPartition]]()
+ consumerPollers.foreach(assignments += _.consumerAssignment())
+ isPartitionAssignmentValid(assignments, subscriptions)
+ }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added one more consumer")
+
+ for (poller <- consumerPollers)
+ poller.shutdown()
+
+ for (consumer <- rrConsumers) {
+ consumer.unsubscribe()
+ }
+ }
+
+ /**
+ * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
+ * records to each partition
+ */
+ def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = {
+ TestUtils.createTopic(this.zkUtils, topicName, numPartitions, serverCount, this.servers)
+ var parts = Set[TopicPartition]()
+ for (partition <- 0 until numPartitions) {
+ val tp = new TopicPartition(topicName, partition)
+ sendRecords(recordsPerPartition, tp)
+ parts = parts + tp
+ }
+ parts
+ }
}