You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/05 20:08:16 UTC
[kafka] branch trunk updated: KAFKA-7893;
Refactor ConsumerBounceTest to reuse functionality from
BaseConsumerTest (#6238)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cc4fde3 KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)
cc4fde3 is described below
commit cc4fde35c9cc2818af1bcb6861ce32dee0f41677
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Apr 5 13:08:04 2019 -0700
KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)
This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get `18/25` passes.
It does so by reusing the less-flaky consumer integration testing functionality inside `BaseConsumerTest`. Most notably, the test now makes use of the `ConsumerAssignmentPoller` class - each consumer now polls non-stop rather than the more batch-oriented polling we had in `ConsumerBounceTest#waitForRebalance()`.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/api/AdminClientIntegrationTest.scala | 34 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +-
.../integration/kafka/api/BaseConsumerTest.scala | 172 ++++++++++++--
.../integration/kafka/api/BaseQuotaTest.scala | 4 +-
.../integration/kafka/api/ConsumerBounceTest.scala | 250 +++++++--------------
.../kafka/api/CustomQuotaCallbackTest.scala | 4 +-
.../api/DescribeAuthorizedOperationsTest.scala | 2 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/GroupAuthorizerIntegrationTest.scala | 4 +-
.../kafka/api/IntegrationTestHarness.scala | 18 +-
.../kafka/api/LegacyAdminClientTest.scala | 4 +-
.../integration/kafka/api/LogAppendTimeTest.scala | 2 +-
.../scala/integration/kafka/api/MetricsTest.scala | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 175 +++++----------
.../SaslClientsWithInvalidCredentialsTest.scala | 4 +-
.../kafka/api/SaslPlainPlaintextConsumerTest.scala | 7 +-
.../kafka/network/DynamicConnectionQuotaTest.scala | 8 +-
.../kafka/server/GssapiAuthenticationTest.scala | 4 +-
.../kafka/server/ScramServerStartupTest.scala | 2 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 2 +-
.../kafka/admin/DelegationTokenCommandTest.scala | 6 +-
.../server/AbstractCreateTopicsRequestTest.scala | 2 +-
.../server/AddPartitionsToTxnRequestTest.scala | 2 +-
.../server/AlterReplicaLogDirsRequestTest.scala | 2 +-
.../unit/kafka/server/ApiVersionsRequestTest.scala | 2 +-
.../scala/unit/kafka/server/BaseRequestTest.scala | 20 +-
.../kafka/server/CreateTopicsRequestTest.scala | 4 +-
.../server/CreateTopicsRequestWithPolicyTest.scala | 6 +-
.../DelegationTokenRequestsOnPlainTextTest.scala | 2 +-
.../kafka/server/DelegationTokenRequestsTest.scala | 6 +-
...nTokenRequestsWithDisableTokenFeatureTest.scala | 2 +-
...leteTopicsRequestWithDeletionDisabledTest.scala | 6 +-
.../kafka/server/DescribeLogDirsRequestTest.scala | 2 +-
.../FetchRequestDownConversionConfigTest.scala | 6 +-
.../KafkaMetricReporterExceptionHandlingTest.scala | 4 +-
.../unit/kafka/server/LogDirFailureTest.scala | 8 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 4 +-
.../unit/kafka/server/MetadataRequestTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
.../kafka/server/SaslApiVersionsRequestTest.scala | 2 +-
.../unit/kafka/server/StopReplicaRequestTest.scala | 2 +-
41 files changed, 393 insertions(+), 405 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5c72cbf..dbb6213 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -87,12 +87,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
super.tearDown()
}
- val serverCount = 3
+ val brokerCount = 3
val consumerCount = 1
val producerCount = 1
override def generateConfigs = {
- val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
cfgs.foreach { config =>
config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
@@ -197,7 +197,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(3, partition.replicas.size)
partition.replicas.asScala.foreach { replica =>
assertTrue(replica.id >= 0)
- assertTrue(replica.id < serverCount)
+ assertTrue(replica.id < brokerCount)
}
assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size)
@@ -301,10 +301,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val topic = "topic"
val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
- val brokers = (0 until serverCount).map(Integer.valueOf)
+ val brokers = (0 until brokerCount).map(Integer.valueOf)
val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
- (0 until serverCount).foreach { brokerId =>
+ (0 until brokerCount).foreach { brokerId =>
val server = servers.find(_.config.brokerId == brokerId).get
val expectedPartitions = partitionsByBroker(brokerId)
val logDirInfos = logDirInfosByBroker.get(brokerId)
@@ -361,7 +361,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException])
}
- createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
servers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@@ -759,7 +759,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testSeekAfterDeleteRecords(): Unit = {
- createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig)
@@ -788,7 +788,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testLogStartOffsetCheckpoint(): Unit = {
- createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig)
@@ -801,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark)
assertEquals(Some(5), lowWatermark)
- for (i <- 0 until serverCount) {
+ for (i <- 0 until brokerCount) {
killBroker(i)
}
restartDeadBrokers()
@@ -828,7 +828,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testLogStartOffsetAfterDeleteRecords(): Unit = {
- createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig)
@@ -842,13 +842,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
assertEquals(3L, lowWatermark)
- for (i <- 0 until serverCount)
+ for (i <- 0 until brokerCount)
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
}
@Test
def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
- val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+ val leaders = createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
@@ -881,7 +881,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
// after the new replica caught up, all replicas should have same log start offset
- for (i <- 0 until serverCount)
+ for (i <- 0 until brokerCount)
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
// kill the same follower again, produce more records, and delete records beyond follower's LOE
@@ -896,7 +896,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testAlterLogDirsAfterDeleteRecords(): Unit = {
client = AdminClient.create(createConfig)
- createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
val expectedLEO = 100
val producer = createProducer()
sendRecords(producer, expectedLEO, topicPartition)
@@ -905,7 +905,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
result.all().get()
// make sure we are in the expected state after delete records
- for (i <- 0 until serverCount) {
+ for (i <- 0 until brokerCount) {
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset)
}
@@ -927,7 +927,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testOffsetsForTimesAfterDeleteRecords(): Unit = {
- createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig)
@@ -999,7 +999,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConfigsForTopic(): Unit = {
- createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig)
val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a7cb654..336bfb1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -54,7 +54,7 @@ import scala.collection.mutable.Buffer
class AuthorizerIntegrationTest extends BaseRequestTest {
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
val brokerId: Integer = 0
def userPrincipal = KafkaPrincipal.ANONYMOUS
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
- override def propertyOverrides(properties: Properties): Unit = {
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 1645b5b..6b8b502 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -14,13 +14,14 @@ package kafka.api
import java.time.Duration
import java.util
+import java.util.Properties
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{ShutdownableThread, TestUtils}
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -30,43 +31,49 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.internals.Topic
+import scala.collection.mutable
+
/**
* Integration tests for the consumer that cover basic usage as well as server failures
*/
-abstract class BaseConsumerTest extends IntegrationTestHarness {
+abstract class BaseConsumerTest extends BaseRequestTest {
val epsilon = 0.1
- val serverCount = 3
+ override def brokerCount: Int = 3
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
val part2 = 1
val tp2 = new TopicPartition(topic, part2)
+ val group = "my-test"
val producerClientId = "ConsumerTestProducer"
val consumerClientId = "ConsumerTestConsumer"
+ val groupMaxSessionTimeoutMs = 30000L
- // configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
- this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
- this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
- this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+ override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+ properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+ properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+ properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ properties.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+ properties.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, groupMaxSessionTimeoutMs.toString)
+ properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
+ }
+
@Before
override def setUp() {
super.setUp()
// create the test topic with all the brokers as replicas
- createTopic(topic, 2, serverCount)
+ createTopic(topic, 2, brokerCount)
}
@Test
@@ -90,7 +97,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
@Test
def testCoordinatorFailover() {
val listener = new TestConsumerReassignmentListener()
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
+ this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
val consumer = createConsumer()
@@ -130,6 +137,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
}
}
+ protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
+ val groupOverrideConfig = new Properties
+ groupOverrideConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ createConsumer(configOverrides = groupOverrideConfig)
+ }
+
protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
val records = (0 until numRecords).map { i =>
@@ -223,6 +236,100 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
assertEquals(None, commitCallback.error)
}
+ /**
+ * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
+ * pollers for these consumers. Wait for partition re-assignment and validate.
+ *
+ * Currently, assignment validation requires that total number of partitions is greater or equal to
+ * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
+ *
+ * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+ * @param consumerGroup current consumer group
+ * @param consumerPollers current consumer pollers
+ * @param topicsToSubscribe topics to which new consumers will subscribe to
+ * @param subscriptions set of all topic partitions
+ */
+ def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
+ consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+ consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition],
+ group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
+ assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
+ addConsumersToGroup(numOfConsumersToAdd, consumerGroup, consumerPollers, topicsToSubscribe, subscriptions, group)
+ // wait until topics get re-assigned and validate assignment
+ validateGroupAssignment(consumerPollers, subscriptions)
+
+ (consumerGroup, consumerPollers)
+ }
+
+ /**
+ * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
+ * pollers for these consumers.
+ *
+ *
+ * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+ * @param consumerGroup current consumer group
+ * @param consumerPollers current consumer pollers
+ * @param topicsToSubscribe topics to which new consumers will subscribe to
+ * @param subscriptions set of all topic partitions
+ */
+ def addConsumersToGroup(numOfConsumersToAdd: Int,
+ consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+ consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition],
+ group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
+ for (_ <- 0 until numOfConsumersToAdd) {
+ val consumer = createConsumerWithGroupId(group)
+ consumerGroup += consumer
+ consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+ }
+
+ (consumerGroup, consumerPollers)
+ }
+
+ /**
+ * Wait for consumers to get partition assignment and validate it.
+ *
+ * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
+ * @param subscriptions set of all topic partitions
+ * @param msg message to print when waiting for/validating assignment fails
+ */
+ def validateGroupAssignment(consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+ subscriptions: Set[TopicPartition],
+ msg: Option[String] = None,
+ waitTime: Long = 10000L): Unit = {
+ val assignments = mutable.Buffer[Set[TopicPartition]]()
+ TestUtils.waitUntilTrue(() => {
+ assignments.clear()
+ consumerPollers.foreach(assignments += _.consumerAssignment())
+ isPartitionAssignmentValid(assignments, subscriptions)
+ }, msg.getOrElse(s"Did not get valid assignment for partitions $subscriptions. Instead, got $assignments"), waitTime)
+ }
+
+ /**
+ * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
+ * consumer poller and starts polling.
+ * Assumes that the consumer is not subscribed to any topics yet
+ *
+ * @param consumer consumer
+ * @param topicsToSubscribe topics that this consumer will subscribe to
+ * @return consumer poller for the given consumer
+ */
+ def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
+ topicsToSubscribe: List[String],
+ partitionsToAssign: Set[TopicPartition] = Set.empty[TopicPartition]): ConsumerAssignmentPoller = {
+ assertEquals(0, consumer.assignment().size)
+ val consumerPoller = if (topicsToSubscribe.nonEmpty)
+ new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
+ else
+ new ConsumerAssignmentPoller(consumer, partitionsToAssign)
+
+ consumerPoller.start()
+ consumerPoller
+ }
+
protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = {
val numReassignments = rebalanceListener.callsToAssigned
TestUtils.pollUntilTrue(consumer, () => rebalanceListener.callsToAssigned > numReassignments,
@@ -253,13 +360,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
}
protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
- topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment", false)
+ topicsToSubscribe: List[String],
+ partitionsToAssign: Set[TopicPartition]) extends ShutdownableThread("daemon-consumer-assignment", false)
{
- @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
- private var topicsSubscription = topicsToSubscribe
+ def this(consumer: Consumer[Array[Byte], Array[Byte]], topicsToSubscribe: List[String]) {
+ this(consumer, topicsToSubscribe, Set.empty[TopicPartition])
+ }
+
+ def this(consumer: Consumer[Array[Byte], Array[Byte]], partitionsToAssign: Set[TopicPartition]) {
+ this(consumer, List.empty[String], partitionsToAssign)
+ }
+
+ @volatile var thrownException: Option[Throwable] = None
+ @volatile var receivedMessages = 0
+
+ @volatile private var partitionAssignment: Set[TopicPartition] = partitionsToAssign
@volatile private var subscriptionChanged = false
+ private var topicsSubscription = topicsToSubscribe
- val rebalanceListener = new ConsumerRebalanceListener {
+ val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
}
@@ -268,7 +387,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
partitionAssignment = Set.empty[TopicPartition]
}
}
- consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+ if (partitionAssignment.isEmpty) {
+ consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+ } else {
+ consumer.assign(partitionAssignment.asJava)
+ }
def consumerAssignment(): Set[TopicPartition] = {
partitionAssignment
@@ -285,14 +408,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
* @param newTopicsToSubscribe
*/
def subscribe(newTopicsToSubscribe: List[String]): Unit = {
- if (subscriptionChanged) {
+ if (subscriptionChanged)
throw new IllegalStateException("Do not call subscribe until the previous subscribe request is processed.")
- }
+ if (partitionsToAssign.nonEmpty)
+ throw new IllegalStateException("Cannot call subscribe when configured to use manual partition assignment")
+
topicsSubscription = newTopicsToSubscribe
subscriptionChanged = true
}
- def isSubscribeRequestProcessed(): Boolean = {
+ def isSubscribeRequestProcessed: Boolean = {
!subscriptionChanged
}
@@ -308,9 +433,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
subscriptionChanged = false
}
try {
- consumer.poll(Duration.ofMillis(50))
+ receivedMessages += consumer.poll(Duration.ofMillis(50)).count()
} catch {
case _: WakeupException => // ignore for shutdown
+ case e: Throwable =>
+ thrownException = Some(e)
+ throw e
}
}
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 4b278f0..d9bc646 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
abstract class BaseQuotaTest extends IntegrationTestHarness {
- override val serverCount = 2
+ override val brokerCount = 2
protected def producerClientId = "QuotasTestProducer-1"
protected def consumerClientId = "QuotasTestConsumer-1"
@@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
super.setUp()
val numPartitions = 1
- val leaders = createTopic(topic1, numPartitions, serverCount)
+ val leaders = createTopic(topic1, numPartitions, brokerCount)
leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
quotaTestClients = createQuotaTestClients(topic1, leaderNode)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 1a1f37e..38650b4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -15,12 +15,10 @@ package kafka.api
import java.time
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantLock
import java.util.{Collection, Collections, Properties}
import util.control.Breaks._
-import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -29,25 +27,23 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.junit.Assert._
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Ignore, Test}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future => SFuture}
+import scala.collection.mutable
/**
* Integration tests for the consumer that cover basic usage as well as server failures
*/
-class ConsumerBounceTest extends BaseRequestTest with Logging {
- val topic = "topic"
- val part = 0
- val tp = new TopicPartition(topic, part)
+class ConsumerBounceTest extends BaseConsumerTest with Logging {
val maxGroupSize = 5
// Time to process commit and leave group requests in tests when brokers are available
- val gracefulCloseTimeMs = 1000
- val executor = Executors.newScheduledThreadPool(2)
+ val gracefulCloseTimeMs = Some(1000L)
+ val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+ val consumerPollers: mutable.Buffer[ConsumerAssignmentPoller] = mutable.Buffer[ConsumerAssignmentPoller]()
+
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
override def generateConfigs = {
generateKafkaConfigs()
@@ -63,21 +59,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
- FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, properties))
}
- @Before
- override def setUp() {
- super.setUp()
-
- // create the test topic with all the brokers as replicas
- createTopic(topic, 1, numBrokers)
- }
-
@After
override def tearDown() {
try {
+ consumerPollers.foreach(_.shutdown())
executor.shutdownNow()
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
@@ -176,7 +165,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val consumer = createConsumer()
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable {
- def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers)
+ def run() = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount)
}, 2, TimeUnit.SECONDS)
consumer.poll(time.Duration.ZERO)
@@ -201,18 +190,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
assertEquals(0, remainingRecords)
}
+ val poller = new ConsumerAssignmentPoller(consumer, List(newtopic))
+ consumerPollers += poller
+ poller.start()
sendRecords(numRecords, newtopic)
- receiveRecords(consumer, numRecords, 10000)
+ receiveExactRecords(poller, numRecords, 10000)
+ poller.shutdown()
servers.foreach(server => killBroker(server.config.brokerId))
Thread.sleep(500)
restartDeadBrokers()
- val future = executor.submit(new Runnable {
- def run() = receiveRecords(consumer, numRecords, 10000)
- })
+ val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic))
+ consumerPollers += poller2
+ poller2.start()
sendRecords(numRecords, newtopic)
- future.get
+ receiveExactRecords(poller, numRecords, 10000L)
}
@Test
@@ -233,7 +226,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/
private def checkCloseGoodPath(numRecords: Int, groupId: String) {
val consumer = createConsumerAndReceive(groupId, false, numRecords)
- val future = submitCloseAndValidate(consumer, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+ val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs)
future.get
checkClosedState(groupId, numRecords)
}
@@ -251,8 +244,8 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
killBroker(findCoordinator(dynamicGroup))
killBroker(findCoordinator(manualGroup))
- val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
- val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+ val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
+ val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs)
future1.get
future2.get
@@ -302,6 +295,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/
@Test
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
+ val group = "group-max-size-test"
val topic = "group-max-size-test"
val maxGroupSize = 2
val consumerCount = maxGroupSize + 1
@@ -311,77 +305,51 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
// ensure even record distribution per partition
recordsProduced += partitionCount - recordsProduced % partitionCount
}
- val executor = Executors.newScheduledThreadPool(consumerCount * 2)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- val producer = createProducer()
- createTopic(topic, numPartitions = partitionCount, replicationFactor = numBrokers)
- val stableConsumers = createConsumersWithGroupId("group2", consumerCount, executor, topic = topic)
+ val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)
- // assert group is stable and working
- sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
- stableConsumers.foreach { cons => {
- receiveAndCommit(cons, recordsProduced / consumerCount, 10000)
- }}
+ addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
+ consumerPollers, List[String](topic), partitions, group)
// roll all brokers with a lesser max group size to make sure coordinator has the new config
val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
- val kickedConsumerOut = new AtomicBoolean(false)
var kickedOutConsumerIdx: Option[Int] = None
- val lock = new ReentrantLock
// restart brokers until the group moves to a Coordinator with the new config
breakable { for (broker <- servers.indices) {
killBroker(broker)
- sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
-
- var successfulConsumes = 0
-
- // compute consumptions in a non-blocking way in order to account for the rebalance once the group.size takes effect
- val consumeFutures = new ArrayBuffer[SFuture[Any]]
- implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
- stableConsumers.indices.foreach(idx => {
- val currentConsumer = stableConsumers(idx)
- val consumeFuture = SFuture {
- try {
- receiveAndCommit(currentConsumer, recordsProduced / consumerCount, 10000)
- CoreUtils.inLock(lock) { successfulConsumes += 1 }
- } catch {
- case e: Throwable =>
- if (!e.isInstanceOf[GroupMaxSizeReachedException]) {
- throw e
- }
- if (!kickedConsumerOut.compareAndSet(false, true)) {
- fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
- }
- kickedOutConsumerIdx = Some(idx)
- }
+ consumerPollers.indices.foreach(idx => {
+ consumerPollers(idx).thrownException match {
+ case Some(thrownException) =>
+ if (!thrownException.isInstanceOf[GroupMaxSizeReachedException]) {
+ throw thrownException
+ }
+ if (kickedOutConsumerIdx.isDefined) {
+ fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
+ }
+ kickedOutConsumerIdx = Some(idx)
+ case None =>
}
-
- consumeFutures += consumeFuture
})
- Await.result(SFuture.sequence(consumeFutures), Duration("12sec"))
- if (kickedConsumerOut.get()) {
- // validate the rest N-1 consumers consumed successfully
- assertEquals(maxGroupSize, successfulConsumes)
+ if (kickedOutConsumerIdx.isDefined)
break
- }
val config = newConfigs(broker)
servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
restartDeadBrokers()
}}
- if (!kickedConsumerOut.get())
+ if (kickedOutConsumerIdx.isEmpty)
fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll")
+ restartDeadBrokers()
// assert that the group has gone through a rebalance and shed off one consumer
- stableConsumers.remove(kickedOutConsumerIdx.get)
- sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
- // should be only maxGroupSize consumers left in the group
- stableConsumers.foreach { cons => {
- receiveAndCommit(cons, recordsProduced / maxGroupSize, 10000)
- }}
+ consumerPollers.remove(kickedOutConsumerIdx.get).shutdown()
+ sendRecords(createProducer(), recordsProduced, topic, numPartitions = Some(partitionCount))
+ TestUtils.waitUntilTrue(() => {
+ consumerPollers.forall(p => p.receivedMessages >= recordsProduced / consumerCount)
+ }, "The remaining consumers in the group could not fetch the expected records", 10000L)
}
/**
@@ -389,70 +357,30 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/
@Test
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
- val topic = "group-max-size-test"
- val groupId = "group1"
- val executor = Executors.newScheduledThreadPool(maxGroupSize * 2)
- createTopic(topic, maxGroupSize, numBrokers)
+ val group = "fatal-exception-test"
+ val topic = "fatal-exception-test"
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
+
// Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group
- val stableConsumers = createConsumersWithGroupId(groupId, maxGroupSize, executor, topic)
- val newConsumer = createConsumerWithGroupId(groupId)
- var failedRebalance = false
- var exception: Exception = null
- waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, onException = e => {failedRebalance = true; exception = e}),
- executor = executor, stableConsumers:_*)
- assertTrue("Rebalance did not fail as expected", failedRebalance)
- assertTrue(exception.isInstanceOf[GroupMaxSizeReachedException])
+ addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
+ consumerPollers, List[String](topic), partitions, group)
+ val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
+ mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
+ val rejectedConsumer = rejectedConsumerPollers.head
+ TestUtils.waitUntilTrue(() => {
+ rejectedConsumer.thrownException.isDefined
+ }, "Extra consumer did not throw an exception")
+ assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
// assert group continues to live
- val producer = createProducer()
- sendRecords(producer, maxGroupSize * 100, topic, numPartitions = Some(maxGroupSize))
- stableConsumers.foreach { cons => {
- receiveExactRecords(cons, 100, 10000)
- }}
- }
-
- /**
- * Creates N consumers with the same group ID and ensures the group rebalances properly at each step
- */
- private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
- val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
- for (_ <- 1.to(consumerCount)) {
- val newConsumer = createConsumerWithGroupId(groupId)
- waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, topic = topic),
- executor = executor, stableConsumers:_*)
- stableConsumers += newConsumer
- }
- stableConsumers
- }
-
- def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], executor: ExecutorService, revokeSemaphore: Option[Semaphore] = None,
- onException: Exception => Unit = e => { throw e }, topic: String = topic, pollTimeout: Int = 1000): Future[Any] = {
- executor.submit(CoreUtils.runnable {
- try {
- consumer.subscribe(Collections.singletonList(topic))
- consumer.poll(java.time.Duration.ofMillis(pollTimeout))
- } catch {
- case e: Exception => onException.apply(e)
- }
- }, 0)
- }
-
- def waitForRebalance(timeoutMs: Long, future: Future[Any], executor: ExecutorService, otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) {
- val startMs = System.currentTimeMillis
- implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
-
- while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) {
- val consumeFutures = otherConsumers.map(consumer => SFuture {
- consumer.poll(time.Duration.ofMillis(1000))
- })
- Await.result(SFuture.sequence(consumeFutures), Duration("1500ms"))
- }
-
- assertTrue("Rebalance did not complete in time", future.isDone)
+ sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
+ TestUtils.waitUntilTrue(() => {
+ consumerPollers.forall(p => p.receivedMessages >= 100)
+ }, "The consumers in the group could not fetch the expected records", 10000L)
}
/**
@@ -463,7 +391,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@Test
def testCloseDuringRebalance() {
val topic = "closetest"
- createTopic(topic, 10, numBrokers)
+ createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -510,7 +438,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val rebalanceFuture = createConsumerToRebalance()
// consumer1 should leave group and close immediately even though rebalance is in progress
- val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+ val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
// Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group
waitForRebalance(2000, rebalanceFuture, consumer2)
@@ -528,40 +456,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
closeFuture2.get(2000, TimeUnit.MILLISECONDS)
}
- private def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- createConsumer()
- }
-
private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = {
val consumer = createConsumerWithGroupId(groupId)
- if (manualAssign)
- consumer.assign(Collections.singleton(tp))
- else
- consumer.subscribe(Collections.singleton(topic))
- receiveExactRecords(consumer, numRecords)
- consumer
- }
-
- private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Long = {
- var received = 0L
- val endTimeMs = System.currentTimeMillis + timeoutMs
- while (received < numRecords && System.currentTimeMillis < endTimeMs)
- received += consumer.poll(time.Duration.ofMillis(100)).count()
-
- received
- }
+ val consumerPoller = if (manualAssign)
+ subscribeConsumerAndStartPolling(consumer, List(), Set(tp))
+ else
+ subscribeConsumerAndStartPolling(consumer, List(topic))
- private def receiveExactRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Unit = {
- val received = receiveRecords(consumer, numRecords, timeoutMs)
- assertEquals(numRecords, received)
+ receiveExactRecords(consumerPoller, numRecords)
+ consumerPoller.shutdown()
+ consumer
}
- @throws(classOf[CommitFailedException])
- private def receiveAndCommit(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long): Unit = {
- val received = receiveRecords(consumer, numRecords, timeoutMs)
- assertTrue(s"Received $received, expected at least $numRecords", numRecords <= received)
- consumer.commitSync()
+ private def receiveExactRecords(consumer: ConsumerAssignmentPoller, numRecords: Int, timeoutMs: Long = 60000): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ consumer.receivedMessages == numRecords
+ }, s"Consumer did not receive expected $numRecords. It received ${consumer.receivedMessages}", timeoutMs)
}
private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@@ -617,6 +527,12 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
}
}
+ private def createTopicPartitions(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
+ topicConfig: Properties = new Properties): Set[TopicPartition] = {
+ createTopic(topic, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig)
+ Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet
+ }
+
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
numRecords: Int,
topic: String = this.topic,
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 8c1d34d..1394c83 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -48,7 +48,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
- override val serverCount: Int = 2
+ override val brokerCount: Int = 2
private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -161,7 +161,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
- assertEquals(serverCount, callbackInstances.get)
+ assertEquals(brokerCount, callbackInstances.get)
}
/**
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 78fc215..57fd552 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -30,7 +30,7 @@ import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
- override val serverCount = 1
+ override val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 0587a6d..49977d0 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -58,7 +58,7 @@ import scala.collection.JavaConverters._
* would end up with ZooKeeperTestHarness twice.
*/
abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
- override val serverCount = 3
+ override val brokerCount = 3
override def configureSecurityBeforeServersStart() {
AclCommand.main(clusterActionArgs)
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 27c3f31..6d2161d 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -33,9 +33,9 @@ class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
override val kafkaPrincipalType = GroupPrincipalType
override def userPrincipal = TestGroupPrincipal
- override def propertyOverrides(properties: Properties): Unit = {
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[GroupPrincipalBuilder].getName)
- super.propertyOverrides(properties)
+ super.brokerPropertyOverrides(properties)
}
}
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 39f47ab..5ffbc43 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -37,7 +37,7 @@ import scala.collection.mutable
* A helper class for writing integration tests that involve producers, consumers, and servers
*/
abstract class IntegrationTestHarness extends KafkaServerTestHarness {
- protected def serverCount: Int
+ protected def brokerCount: Int
protected def logDirCount: Int = 1
val producerConfig = new Properties
@@ -49,10 +49,20 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
protected def interBrokerListenerName: ListenerName = listenerName
+ protected def modifyConfigs(props: Seq[Properties]): Unit = {
+ configureListeners(props)
+ props.foreach(_ ++= serverConfig)
+ }
+
override def generateConfigs: Seq[KafkaConfig] = {
- val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
- cfgs.foreach { config =>
+ modifyConfigs(cfgs)
+ cfgs.map(KafkaConfig.fromProps)
+ }
+
+ protected def configureListeners(props: Seq[Properties]): Unit = {
+ props.foreach { config =>
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value)
@@ -63,8 +73,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
config.setProperty(KafkaConfig.ListenersProp, listeners)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap)
}
- cfgs.foreach(_ ++= serverConfig)
- cfgs.map(KafkaConfig.fromProps)
}
@Before
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 08a0224..b30b363 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -41,7 +41,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
val producerCount = 1
val consumerCount = 2
- val serverCount = 3
+ val brokerCount = 3
val groupId = "my-test"
val clientId = "consumer-498"
@@ -70,7 +70,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
override def setUp() {
super.setUp()
client = AdminClient.createSimplePlaintext(this.brokerList)
- createTopic(topic, 2, serverCount)
+ createTopic(topic, 2, brokerCount)
}
@After
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 795f954..8d2e66e 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -33,7 +33,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
class LogAppendTimeTest extends IntegrationTestHarness {
val producerCount: Int = 1
val consumerCount: Int = 1
- val serverCount: Int = 2
+ val brokerCount: Int = 2
// This will be used for the offsets topic as well
serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index b2f5a7e..da466b8 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class MetricsTest extends IntegrationTestHarness with SaslSetup {
- override val serverCount = 1
+ override val brokerCount = 1
override protected def listenerName = new ListenerName("CLIENT")
private val kafkaClientSaslMechanism = "PLAIN"
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c11fc12..e3251a5 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util.{Collections, Locale, Optional, Properties}
import kafka.log.LogConfig
-import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@@ -38,6 +37,8 @@ import scala.collection.mutable.Buffer
import kafka.server.QuotaType
import kafka.server.KafkaServer
+import scala.collection.mutable
+
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest {
@@ -342,17 +343,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(producer, numRecords, tp)
val topic1 = "tblablac" // matches subscribed pattern
- createTopic(topic1, 2, serverCount)
+ createTopic(topic1, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
val topic2 = "tblablak" // does not match subscribed pattern
- createTopic(topic2, 2, serverCount)
+ createTopic(topic2, 2, brokerCount)
sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
val topic3 = "tblab1" // does not match subscribed pattern
- createTopic(topic3, 2, serverCount)
+ createTopic(topic3, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
@@ -370,7 +371,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, assignment)
val topic4 = "tsomec" // matches subscribed pattern
- createTopic(topic4, 2, serverCount)
+ createTopic(topic4, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
@@ -404,7 +405,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// the first topic ('topic') matches first subscription pattern only
val fooTopic = "foo" // matches both subscription patterns
- createTopic(fooTopic, 1, serverCount)
+ createTopic(fooTopic, 1, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
assertEquals(0, consumer.assignment().size)
@@ -419,7 +420,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, assignment)
val barTopic = "bar" // matches the next subscription pattern
- createTopic(barTopic, 1, serverCount)
+ createTopic(barTopic, 1, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
@@ -450,7 +451,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(producer, numRecords, tp)
val topic1 = "tblablac" // matches the subscription pattern
- createTopic(topic1, 2, serverCount)
+ createTopic(topic1, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
@@ -517,7 +518,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.subscribe(List(topic).asJava)
awaitAssignment(consumer, initialAssignment)
- createTopic(otherTopic, 2, serverCount)
+ createTopic(otherTopic, 2, brokerCount)
val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
consumer.subscribe(List(topic, otherTopic).asJava)
awaitAssignment(consumer, expandedAssignment)
@@ -526,7 +527,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testShrinkingTopicSubscriptions() {
val otherTopic = "other"
- createTopic(otherTopic, 2, serverCount)
+ createTopic(otherTopic, 2, brokerCount)
val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
val consumer = createConsumer()
consumer.subscribe(List(topic, otherTopic).asJava)
@@ -781,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val partitionCount = 30
val topics = Seq(topic1, topic2, topic3)
topics.foreach { topicName =>
- createTopic(topicName, partitionCount, serverCount)
+ createTopic(topicName, partitionCount, brokerCount)
}
val partitions = topics.flatMap { topic =>
@@ -861,11 +862,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// for the topic partition assignment
val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
try {
- validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+ validateGroupAssignment(consumerPollers, subscriptions)
// add one more consumer and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers,
- List(topic1, topic2), subscriptions)
+ List(topic1, topic2), subscriptions, "roundrobin-group")
} finally {
consumerPollers.foreach(_.shutdown())
}
@@ -900,11 +901,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create a group of consumers, subscribe the consumers to the single topic and start polling
// for the topic partition assignment
val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
- validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}")
+ validateGroupAssignment(consumerPollers, partitions)
val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
// add one more consumer and validate re-assignment
- addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions)
+ addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions, "sticky-group")
val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
@@ -945,7 +946,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1))
try {
- validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+ validateGroupAssignment(consumerPollers, subscriptions)
// add 2 more consumers and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions)
@@ -1040,7 +1041,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testAutoCommitIntercept() {
val topic2 = "topic2"
- createTopic(topic2, 2, serverCount)
+ createTopic(topic2, 2, brokerCount)
// produce records
val numRecords = 100
@@ -1336,7 +1337,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testAutoCommitOnRebalance() {
val topic2 = "topic2"
- createTopic(topic2, 2, serverCount)
+ createTopic(topic2, 2, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer = createConsumer()
@@ -1376,7 +1377,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
val numMessages = 1000
val topic2 = "topic2"
- createTopic(topic2, 2, serverCount)
+ createTopic(topic2, 2, brokerCount)
// send some messages.
val producer = createProducer()
sendRecords(producer, numMessages, tp)
@@ -1415,7 +1416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPerPartitionLagMetricsCleanUpWithSubscribe() {
val numMessages = 1000
val topic2 = "topic2"
- createTopic(topic2, 2, serverCount)
+ createTopic(topic2, 2, brokerCount)
// send some messages.
val producer = createProducer()
sendRecords(producer, numMessages, tp)
@@ -1635,16 +1636,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumerPollers += timeoutPoller
// validate the initial assignment
- validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+ validateGroupAssignment(consumerPollers, subscriptions)
// stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
timeoutPoller.shutdown()
if (closeConsumer)
timeoutConsumer.close()
- val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout)
+ Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left"), 3 * groupMaxSessionTimeoutMs)
// done with pollers and consumers
for (poller <- consumerPollers)
@@ -1652,6 +1652,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
/**
+ * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to
+ * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
+ *
+ * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
+ *
+ * @param consumerGroup consumer group
+ * @param topicsToSubscribe topics to which consumers will subscribe to
+ * @return collection of consumer pollers
+ */
+ def subscribeConsumers(consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+ topicsToSubscribe: List[String]): mutable.Buffer[ConsumerAssignmentPoller] = {
+ val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]()
+ for (consumer <- consumerGroup)
+ consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+ consumerPollers
+ }
+
+ /**
* Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
* records to each partition
*/
@@ -1659,7 +1677,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
topicName: String,
numPartitions: Int,
recordsPerPartition: Int): Set[TopicPartition] = {
- createTopic(topicName, numPartitions, serverCount)
+ createTopic(topicName, numPartitions, brokerCount)
var parts = Set[TopicPartition]()
for (partition <- 0 until numPartitions) {
val tp = new TopicPartition(topicName, partition)
@@ -1670,51 +1688,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
/**
- * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
- * consumer poller and starts polling.
- * Assumes that the consumer is not subscribed to any topics yet
- *
- * @param consumer consumer
- * @param topicsToSubscribe topics that this consumer will subscribe to
- * @return consumer poller for the given consumer
- */
- def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
- topicsToSubscribe: List[String]): ConsumerAssignmentPoller = {
- assertEquals(0, consumer.assignment().size)
- val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
- consumerPoller.start()
- consumerPoller
- }
-
- /**
- * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to
- * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
- *
- * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
- *
- * @param consumerGroup consumer group
- * @param topicsToSubscribe topics to which consumers will subscribe to
- * @return collection of consumer pollers
- */
- def subscribeConsumers(consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
- topicsToSubscribe: List[String]): Buffer[ConsumerAssignmentPoller] = {
- val consumerPollers = Buffer[ConsumerAssignmentPoller]()
- for (consumer <- consumerGroup)
- consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
- consumerPollers
- }
-
- /**
- * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to
- * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
- *
- * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
- *
- * @param consumerCount number of consumers to create
- * @param topicsToSubscribe topics to which consumers will subscribe to
- * @param subscriptions set of all topic partitions
- * @return collection of created consumers and collection of corresponding consumer pollers
- */
+ * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to
+ * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
+ *
+ * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
+ *
+ * @param consumerCount number of consumers to create
+ * @param topicsToSubscribe topics to which consumers will subscribe to
+ * @param subscriptions set of all topic partitions
+ * @return collection of created consumers and collection of corresponding consumer pollers
+ */
def createConsumerGroupAndWaitForAssignment(consumerCount: Int,
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
@@ -1728,54 +1711,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
(consumerGroup, consumerPollers)
}
- /**
- * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
- * pollers for these consumers. Wait for partition re-assignment and validate.
- *
- * Currently, assignment validation requires that total number of partitions is greater or equal to
- * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
- *
- * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
- * @param consumerGroup current consumer group
- * @param consumerPollers current consumer pollers
- * @param topicsToSubscribe topics to which new consumers will subscribe to
- * @param subscriptions set of all topic partitions
- */
- def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
- consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
- consumerPollers: Buffer[ConsumerAssignmentPoller],
- topicsToSubscribe: List[String],
- subscriptions: Set[TopicPartition]): Unit = {
- assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
- for (_ <- 0 until numOfConsumersToAdd) {
- val consumer = createConsumer()
- consumerGroup += consumer
- consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
- }
-
- // wait until topics get re-assigned and validate assignment
- validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added $numOfConsumersToAdd consumer(s)")
- }
-
- /**
- * Wait for consumers to get partition assignment and validate it.
- *
- * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
- * @param subscriptions set of all topic partitions
- * @param msg message to print when waiting for/validating assignment fails
- */
- def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
- subscriptions: Set[TopicPartition],
- msg: String,
- waitTime: Long = 10000L): Unit = {
- TestUtils.waitUntilTrue(() => {
- val assignments = Buffer[Set[TopicPartition]]()
- consumerPollers.foreach(assignments += _.consumerAssignment())
- isPartitionAssignmentValid(assignments, subscriptions)
- }, msg, waitTime)
- }
-
def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): Unit = {
@@ -1785,11 +1720,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// since subscribe call to poller does not actually call consumer subscribe right away, wait
// until subscribe is called on all consumers
TestUtils.waitUntilTrue(() => {
- consumerPollers forall (poller => poller.isSubscribeRequestProcessed())
- }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L)
+ consumerPollers.forall { poller => poller.isSubscribeRequestProcessed }
+ }, s"Failed to call subscribe on all consumers in the group for subscription $subscriptions", 1000L)
validateGroupAssignment(consumerPollers, subscriptions,
- s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
+ Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription"))
}
def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index c353b52..bbb3d19 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -39,7 +39,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
val consumerCount = 1
val producerCount = 1
- val serverCount = 1
+ val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
@@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KafkaServerContextName))
super.setUp()
- createTopic(topic, numPartitions, serverCount)
+ createTopic(topic, numPartitions, brokerCount)
}
@After
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index c15a508..59c6d34 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -13,7 +13,7 @@
package kafka.api
import java.io.File
-import java.util.Locale
+import java.util.{Locale, Properties}
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
@@ -47,6 +47,11 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
closeSasl()
}
+ override def modifyConfigs(props: Seq[Properties]): Unit = {
+ super.modifyConfigs(props)
+ configureListeners(props)
+ }
+
/**
* Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
* when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 41e1ff9..8686575 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
class DynamicConnectionQuotaTest extends BaseRequestTest {
- override def numBrokers = 1
+ override def brokerCount = 1
val topic = "test"
val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
@@ -49,7 +49,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
@Before
override def setUp(): Unit = {
super.setUp()
- TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
+ TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers)
}
@After
@@ -64,8 +64,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
}
- override protected def propertyOverrides(properties: Properties): Unit = {
- super.propertyOverrides(properties)
+ override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+ super.brokerPropertyOverrides(properties)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 07885cd..8842171 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -39,7 +39,7 @@ import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
- override val serverCount = 1
+ override val brokerCount = 1
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "GSSAPI"
@@ -70,7 +70,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000")
// create the test topic with all the brokers as replicas
- createTopic(topic, 2, serverCount)
+ createTopic(topic, 2, brokerCount)
}
@After
diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
index 0f40650..4c2c640 100644
--- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters._
*/
class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
- override val serverCount = 1
+ override val brokerCount = 1
private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 3128346..a5f1bab 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class AddPartitionsTest extends BaseRequestTest {
- protected override def numBrokers: Int = 4
+ override def brokerCount: Int = 4
val partitionId = 0
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index a4c27e5..0c9f4d3 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -39,7 +39,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: org.apache.kafka.clients.admin.AdminClient = null
- override def numBrokers = 1
+ override def brokerCount = 1
@Before
override def setUp(): Unit = {
@@ -48,11 +48,11 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
}
override def generateConfigs = {
- val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
- props.foreach(propertyOverrides)
+ props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index d6262b3..a54e5fc 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class AbstractCreateTopicsRequestTest extends BaseRequestTest {
- override def propertyOverrides(properties: Properties): Unit =
+ override def brokerPropertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
def topicsReq(topics: Seq[CreatableTopic],
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
index 9071f95..d15409c 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
@@ -31,7 +31,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
private val topic1 = "foobartopic"
val numPartitions = 3
- override def propertyOverrides(properties: Properties): Unit =
+ override def brokerPropertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
@Before
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index 28ed81d..d61c14c 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -33,7 +33,7 @@ import scala.util.Random
class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
override val logDirCount = 5
- override val numBrokers = 1
+ override val brokerCount = 1
val topic = "topic"
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 83f7111..ab70aec 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -40,7 +40,7 @@ object ApiVersionsRequestTest {
class ApiVersionsRequestTest extends BaseRequestTest {
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
@Test
def testApiVersionsRequest() {
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 09ffe4f..a92aeaf 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -24,7 +24,6 @@ import java.util.Properties
import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer
-import kafka.utils._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.ApiKeys
@@ -32,22 +31,19 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestRespons
import org.apache.kafka.common.security.auth.SecurityProtocol
abstract class BaseRequestTest extends IntegrationTestHarness {
- override val serverCount: Int = numBrokers
private var correlationId = 0
// If required, set number of brokers
- protected def numBrokers: Int = 3
+ override def brokerCount: Int = 3
// If required, override properties by mutating the passed Properties object
- protected def propertyOverrides(properties: Properties) {}
-
- override def generateConfigs = {
- val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
- enableControlledShutdown = false,
- interBrokerSecurityProtocol = Some(securityProtocol),
- trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
- props.foreach(propertyOverrides)
- props.map(KafkaConfig.fromProps)
+ protected def brokerPropertyOverrides(properties: Properties) {}
+
+ override def modifyConfigs(props: Seq[Properties]): Unit = {
+ props.foreach { p =>
+ p.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+ brokerPropertyOverrides(p)
+ }
}
def anySocketServer = {
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index db2028c..709b3c9 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -55,7 +55,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
- replicationFactor = numBrokers + 1))),
+ replicationFactor = brokerCount + 1))),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
config=Map("not.a.property" -> "error")))),
@@ -71,7 +71,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq(existingTopic),
topicReq("partial-partitions", numPartitions = -1),
- topicReq("partial-replication", replicationFactor=numBrokers + 1),
+ topicReq("partial-replication", replicationFactor=brokerCount + 1),
topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))),
topicReq("partial-none"))),
Map(
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 4fc3244..0395484 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -32,8 +32,8 @@ import scala.collection.JavaConverters._
class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest {
import CreateTopicsRequestWithPolicyTest._
- override def propertyOverrides(properties: Properties): Unit = {
- super.propertyOverrides(properties)
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
+ super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName)
}
@@ -94,7 +94,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
Some("Topic 'existing-topic' already exists."))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
- numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true),
+ numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor: 4 larger than available brokers: 3."))))
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 6d56a02..0a7e194 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -29,7 +29,7 @@ import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
var adminClient: AdminClient = null
- override def numBrokers = 1
+ override def brokerCount = 1
@Before
override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 6f79a9a..ae65016 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -38,7 +38,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: AdminClient = null
- override def numBrokers = 1
+ override def brokerCount = 1
@Before
override def setUp(): Unit = {
@@ -47,11 +47,11 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
}
override def generateConfigs = {
- val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
- props.foreach(propertyOverrides)
+ props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 1203f83..7de624f 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -36,7 +36,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: AdminClient = null
- override def numBrokers = 1
+ override def brokerCount = 1
@Before
override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index 7240d77..6a01165 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -29,14 +29,14 @@ import java.util.Collections
class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
override def generateConfigs = {
- val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+ val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
- props.foreach(propertyOverrides)
+ props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 5a0244b..a1e4c73 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -27,7 +27,7 @@ import java.io.File
class DescribeLogDirsRequestTest extends BaseRequestTest {
override val logDirCount = 2
- override val numBrokers: Int = 1
+ override val brokerCount: Int = 1
val topic = "topic"
val partitionNum = 2
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 673abe6..4ae7d93 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -32,7 +32,7 @@ import org.junit.Test
class FetchRequestDownConversionConfigTest extends BaseRequestTest {
private var producer: KafkaProducer[String, String] = null
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
override def setUp(): Unit = {
super.setUp()
@@ -45,8 +45,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
super.tearDown()
}
- override protected def propertyOverrides(properties: Properties): Unit = {
- super.propertyOverrides(properties)
+ override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+ super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index 30f3b23..f3580cf 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger
*/
class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
- override def propertyOverrides(properties: Properties): Unit = {
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
}
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index d56a9f0..3eff38f 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -41,7 +41,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
val producerCount: Int = 1
val consumerCount: Int = 1
- val serverCount: Int = 2
+ val brokerCount: Int = 2
private val topic = "topic"
private val partitionNum = 12
override val logDirCount = 3
@@ -52,7 +52,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
@Before
override def setUp() {
super.setUp()
- createTopic(topic, partitionNum, serverCount)
+ createTopic(topic, partitionNum, brokerCount)
}
@Test
@@ -71,7 +71,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
var server: KafkaServer = null
try {
- val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3)
+ val props = TestUtils.createBrokerConfig(brokerCount, zkConnect, logDirCount = 3)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
val kafkaConfig = KafkaConfig.fromProps(props)
@@ -118,7 +118,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
// has fetched from the leader and attempts to append to the offline replica.
producer.send(record).get
- assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+ assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
}
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 04b3467..7b52e7b 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -38,11 +38,11 @@ class LogOffsetTest extends BaseRequestTest {
private lazy val time = new MockTime
- protected override def numBrokers = 1
+ override def brokerCount = 1
protected override def brokerTime(brokerId: Int) = time
- protected override def propertyOverrides(props: Properties): Unit = {
+ protected override def brokerPropertyOverrides(props: Properties): Unit = {
props.put("log.flush.interval.messages", "1")
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index bde16b6..f920d94 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -34,7 +34,7 @@ import scala.collection.JavaConverters._
class MetadataRequestTest extends BaseRequestTest {
- override def propertyOverrides(properties: Properties) {
+ override def brokerPropertyOverrides(properties: Properties) {
properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2")
properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 671f9e0..d04f39f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -49,7 +49,7 @@ import scala.collection.mutable.ListBuffer
class RequestQuotaTest extends BaseRequestTest {
- override def numBrokers: Int = 1
+ override def brokerCount: Int = 1
private val topic = "topic-1"
private val numPartitions = 1
@@ -66,7 +66,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val executor = Executors.newCachedThreadPool
private val tasks = new ListBuffer[Task]
- override def propertyOverrides(properties: Properties): Unit = {
+ override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 185a2f4..7dcc96b 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -36,7 +36,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
- override def numBrokers = 1
+ override def brokerCount = 1
@Before
override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
index 2d0f1db..f246b25 100644
--- a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
@@ -28,7 +28,7 @@ import collection.JavaConverters._
class StopReplicaRequestTest extends BaseRequestTest {
override val logDirCount = 2
- override val numBrokers: Int = 1
+ override val brokerCount: Int = 1
val topic = "topic"
val partitionNum = 2