You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/05 20:08:16 UTC

[kafka] branch trunk updated: KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc4fde3  KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)
cc4fde3 is described below

commit cc4fde35c9cc2818af1bcb6861ce32dee0f41677
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Apr 5 13:08:04 2019 -0700

    KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)
    
    This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get `18/25` passes.
    
    It does so by reusing the less-flaky consumer integration testing functionality inside `BaseConsumerTest`. Most notably, the test now makes use of the `ConsumerAssignmentPoller` class  - each consumer now polls non-stop rather than the more batch-oriented polling we had in `ConsumerBounceTest#waitForRebalance()`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/api/AdminClientIntegrationTest.scala     |  34 +--
 .../kafka/api/AuthorizerIntegrationTest.scala      |   4 +-
 .../integration/kafka/api/BaseConsumerTest.scala   | 172 ++++++++++++--
 .../integration/kafka/api/BaseQuotaTest.scala      |   4 +-
 .../integration/kafka/api/ConsumerBounceTest.scala | 250 +++++++--------------
 .../kafka/api/CustomQuotaCallbackTest.scala        |   4 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   2 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |   2 +-
 .../kafka/api/GroupAuthorizerIntegrationTest.scala |   4 +-
 .../kafka/api/IntegrationTestHarness.scala         |  18 +-
 .../kafka/api/LegacyAdminClientTest.scala          |   4 +-
 .../integration/kafka/api/LogAppendTimeTest.scala  |   2 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |   2 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 175 +++++----------
 .../SaslClientsWithInvalidCredentialsTest.scala    |   4 +-
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |   7 +-
 .../kafka/network/DynamicConnectionQuotaTest.scala |   8 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |   4 +-
 .../kafka/server/ScramServerStartupTest.scala      |   2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |   2 +-
 .../kafka/admin/DelegationTokenCommandTest.scala   |   6 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   2 +-
 .../server/AddPartitionsToTxnRequestTest.scala     |   2 +-
 .../server/AlterReplicaLogDirsRequestTest.scala    |   2 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   2 +-
 .../scala/unit/kafka/server/BaseRequestTest.scala  |  20 +-
 .../kafka/server/CreateTopicsRequestTest.scala     |   4 +-
 .../server/CreateTopicsRequestWithPolicyTest.scala |   6 +-
 .../DelegationTokenRequestsOnPlainTextTest.scala   |   2 +-
 .../kafka/server/DelegationTokenRequestsTest.scala |   6 +-
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |   2 +-
 ...leteTopicsRequestWithDeletionDisabledTest.scala |   6 +-
 .../kafka/server/DescribeLogDirsRequestTest.scala  |   2 +-
 .../FetchRequestDownConversionConfigTest.scala     |   6 +-
 .../KafkaMetricReporterExceptionHandlingTest.scala |   4 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   8 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   4 +-
 .../unit/kafka/server/MetadataRequestTest.scala    |   2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   4 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |   2 +-
 .../unit/kafka/server/StopReplicaRequestTest.scala |   2 +-
 41 files changed, 393 insertions(+), 405 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5c72cbf..dbb6213 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -87,12 +87,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     super.tearDown()
   }
 
-  val serverCount = 3
+  val brokerCount = 3
   val consumerCount = 1
   val producerCount = 1
 
   override def generateConfigs = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
     cfgs.foreach { config =>
       config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
@@ -197,7 +197,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       assertEquals(3, partition.replicas.size)
       partition.replicas.asScala.foreach { replica =>
         assertTrue(replica.id >= 0)
-        assertTrue(replica.id < serverCount)
+        assertTrue(replica.id < brokerCount)
       }
       assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size)
 
@@ -301,10 +301,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val topic = "topic"
     val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
     val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
-    val brokers = (0 until serverCount).map(Integer.valueOf)
+    val brokers = (0 until brokerCount).map(Integer.valueOf)
     val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
 
-    (0 until serverCount).foreach { brokerId =>
+    (0 until brokerCount).foreach { brokerId =>
       val server = servers.find(_.config.brokerId == brokerId).get
       val expectedPartitions = partitionsByBroker(brokerId)
       val logDirInfos = logDirInfosByBroker.get(brokerId)
@@ -361,7 +361,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException])
     }
 
-    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
     servers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@@ -759,7 +759,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testSeekAfterDeleteRecords(): Unit = {
-    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = AdminClient.create(createConfig)
 
@@ -788,7 +788,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testLogStartOffsetCheckpoint(): Unit = {
-    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = AdminClient.create(createConfig)
 
@@ -801,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark)
     assertEquals(Some(5), lowWatermark)
 
-    for (i <- 0 until serverCount) {
+    for (i <- 0 until brokerCount) {
       killBroker(i)
     }
     restartDeadBrokers()
@@ -828,7 +828,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testLogStartOffsetAfterDeleteRecords(): Unit = {
-    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = AdminClient.create(createConfig)
 
@@ -842,13 +842,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
     assertEquals(3L, lowWatermark)
 
-    for (i <- 0 until serverCount)
+    for (i <- 0 until brokerCount)
       assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
   }
 
   @Test
   def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
-    val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    val leaders = createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
     val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
@@ -881,7 +881,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
 
     // after the new replica caught up, all replicas should have same log start offset
-    for (i <- 0 until serverCount)
+    for (i <- 0 until brokerCount)
       assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
 
     // kill the same follower again, produce more records, and delete records beyond follower's LOE
@@ -896,7 +896,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Test
   def testAlterLogDirsAfterDeleteRecords(): Unit = {
     client = AdminClient.create(createConfig)
-    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
     val expectedLEO = 100
     val producer = createProducer()
     sendRecords(producer, expectedLEO, topicPartition)
@@ -905,7 +905,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
     result.all().get()
     // make sure we are in the expected state after delete records
-    for (i <- 0 until serverCount) {
+    for (i <- 0 until brokerCount) {
       assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
       assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset)
     }
@@ -927,7 +927,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testOffsetsForTimesAfterDeleteRecords(): Unit = {
-    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
 
     client = AdminClient.create(createConfig)
 
@@ -999,7 +999,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConfigsForTopic(): Unit = {
-    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
     client = AdminClient.create(createConfig)
 
     val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a7cb654..336bfb1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -54,7 +54,7 @@ import scala.collection.mutable.Buffer
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
   val brokerId: Integer = 0
   def userPrincipal = KafkaPrincipal.ANONYMOUS
 
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
-  override def propertyOverrides(properties: Properties): Unit = {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 1645b5b..6b8b502 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -14,13 +14,14 @@ package kafka.api
 
 import java.time.Duration
 import java.util
+import java.util.Properties
 
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import kafka.utils.{ShutdownableThread, TestUtils}
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -30,43 +31,49 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.internals.Topic
 
+import scala.collection.mutable
+
 /**
  * Integration tests for the consumer that cover basic usage as well as server failures
  */
-abstract class BaseConsumerTest extends IntegrationTestHarness {
+abstract class BaseConsumerTest extends BaseRequestTest {
 
   val epsilon = 0.1
-  val serverCount = 3
+  override def brokerCount: Int = 3
 
   val topic = "topic"
   val part = 0
   val tp = new TopicPartition(topic, part)
   val part2 = 1
   val tp2 = new TopicPartition(topic, part2)
+  val group = "my-test"
   val producerClientId = "ConsumerTestProducer"
   val consumerClientId = "ConsumerTestConsumer"
+  val groupMaxSessionTimeoutMs = 30000L
 
-  // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
-  this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
-  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
   this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
   this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
 
+  override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+    properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+    properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+    properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+    properties.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+    properties.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, groupMaxSessionTimeoutMs.toString)
+    properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
+  }
+
   @Before
   override def setUp() {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    createTopic(topic, 2, serverCount)
+    createTopic(topic, 2, brokerCount)
   }
 
   @Test
@@ -90,7 +97,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   @Test
   def testCoordinatorFailover() {
     val listener = new TestConsumerReassignmentListener()
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
     val consumer = createConsumer()
 
@@ -130,6 +137,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     }
   }
 
+  protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
+    val groupOverrideConfig = new Properties
+    groupOverrideConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    createConsumer(configOverrides = groupOverrideConfig)
+  }
+
   protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
                             tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
     val records = (0 until numRecords).map { i =>
@@ -223,6 +236,100 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     assertEquals(None, commitCallback.error)
   }
 
+  /**
+    * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
+    * pollers for these consumers. Wait for partition re-assignment and validate.
+    *
+    * Currently, assignment validation requires that total number of partitions is greater or equal to
+    * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
+    *
+    * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+    * @param consumerGroup current consumer group
+    * @param consumerPollers current consumer pollers
+    * @param topicsToSubscribe topics to which new consumers will subscribe to
+    * @param subscriptions set of all topic partitions
+    */
+  def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
+                                                   consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+                                                   consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+                                                   topicsToSubscribe: List[String],
+                                                   subscriptions: Set[TopicPartition],
+                                                   group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
+    assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
+    addConsumersToGroup(numOfConsumersToAdd, consumerGroup, consumerPollers, topicsToSubscribe, subscriptions, group)
+    // wait until topics get re-assigned and validate assignment
+    validateGroupAssignment(consumerPollers, subscriptions)
+
+    (consumerGroup, consumerPollers)
+  }
+
+  /**
+    * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
+    * pollers for these consumers.
+    *
+    *
+    * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+    * @param consumerGroup current consumer group
+    * @param consumerPollers current consumer pollers
+    * @param topicsToSubscribe topics to which new consumers will subscribe to
+    * @param subscriptions set of all topic partitions
+    */
+  def addConsumersToGroup(numOfConsumersToAdd: Int,
+                          consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+                          consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+                          topicsToSubscribe: List[String],
+                          subscriptions: Set[TopicPartition],
+                          group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
+    for (_ <- 0 until numOfConsumersToAdd) {
+      val consumer = createConsumerWithGroupId(group)
+      consumerGroup += consumer
+      consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+    }
+
+    (consumerGroup, consumerPollers)
+  }
+
+  /**
+    * Wait for consumers to get partition assignment and validate it.
+    *
+    * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
+    * @param subscriptions set of all topic partitions
+    * @param msg message to print when waiting for/validating assignment fails
+    */
+  def validateGroupAssignment(consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
+                              subscriptions: Set[TopicPartition],
+                              msg: Option[String] = None,
+                              waitTime: Long = 10000L): Unit = {
+    val assignments = mutable.Buffer[Set[TopicPartition]]()
+    TestUtils.waitUntilTrue(() => {
+      assignments.clear()
+      consumerPollers.foreach(assignments += _.consumerAssignment())
+      isPartitionAssignmentValid(assignments, subscriptions)
+    }, msg.getOrElse(s"Did not get valid assignment for partitions $subscriptions. Instead, got $assignments"), waitTime)
+  }
+
+  /**
+    * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
+    * consumer poller and starts polling.
+    * Assumes that the consumer is not subscribed to any topics yet
+    *
+    * @param consumer consumer
+    * @param topicsToSubscribe topics that this consumer will subscribe to
+    * @return consumer poller for the given consumer
+    */
+  def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
+                                       topicsToSubscribe: List[String],
+                                       partitionsToAssign: Set[TopicPartition] = Set.empty[TopicPartition]): ConsumerAssignmentPoller = {
+    assertEquals(0, consumer.assignment().size)
+    val consumerPoller = if (topicsToSubscribe.nonEmpty)
+      new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
+    else
+      new ConsumerAssignmentPoller(consumer, partitionsToAssign)
+
+    consumerPoller.start()
+    consumerPoller
+  }
+
   protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = {
     val numReassignments = rebalanceListener.callsToAssigned
     TestUtils.pollUntilTrue(consumer, () => rebalanceListener.callsToAssigned > numReassignments,
@@ -253,13 +360,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   }
 
   protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
-                                           topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment", false)
+                                           topicsToSubscribe: List[String],
+                                           partitionsToAssign: Set[TopicPartition]) extends ShutdownableThread("daemon-consumer-assignment", false)
   {
-    @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
-    private var topicsSubscription = topicsToSubscribe
+    def this(consumer: Consumer[Array[Byte], Array[Byte]], topicsToSubscribe: List[String]) {
+      this(consumer, topicsToSubscribe, Set.empty[TopicPartition])
+    }
+
+    def this(consumer: Consumer[Array[Byte], Array[Byte]], partitionsToAssign: Set[TopicPartition]) {
+      this(consumer, List.empty[String], partitionsToAssign)
+    }
+
+    @volatile var thrownException: Option[Throwable] = None
+    @volatile var receivedMessages = 0
+
+    @volatile private var partitionAssignment: Set[TopicPartition] = partitionsToAssign
     @volatile private var subscriptionChanged = false
+    private var topicsSubscription = topicsToSubscribe
 
-    val rebalanceListener = new ConsumerRebalanceListener {
+    val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
       }
@@ -268,7 +387,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
         partitionAssignment = Set.empty[TopicPartition]
       }
     }
-    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+    if (partitionAssignment.isEmpty) {
+      consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+    } else {
+      consumer.assign(partitionAssignment.asJava)
+    }
 
     def consumerAssignment(): Set[TopicPartition] = {
       partitionAssignment
@@ -285,14 +408,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
      * @param newTopicsToSubscribe
      */
     def subscribe(newTopicsToSubscribe: List[String]): Unit = {
-      if (subscriptionChanged) {
+      if (subscriptionChanged)
         throw new IllegalStateException("Do not call subscribe until the previous subscribe request is processed.")
-      }
+      if (partitionsToAssign.nonEmpty)
+        throw new IllegalStateException("Cannot call subscribe when configured to use manual partition assignment")
+
       topicsSubscription = newTopicsToSubscribe
       subscriptionChanged = true
     }
 
-    def isSubscribeRequestProcessed(): Boolean = {
+    def isSubscribeRequestProcessed: Boolean = {
       !subscriptionChanged
     }
 
@@ -308,9 +433,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
         subscriptionChanged = false
       }
       try {
-        consumer.poll(Duration.ofMillis(50))
+        receivedMessages += consumer.poll(Duration.ofMillis(50)).count()
       } catch {
         case _: WakeupException => // ignore for shutdown
+        case e: Throwable =>
+          thrownException = Some(e)
+          throw e
       }
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 4b278f0..d9bc646 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
-  override val serverCount = 2
+  override val brokerCount = 2
 
   protected def producerClientId = "QuotasTestProducer-1"
   protected def consumerClientId = "QuotasTestConsumer-1"
@@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     super.setUp()
 
     val numPartitions = 1
-    val leaders = createTopic(topic1, numPartitions, serverCount)
+    val leaders = createTopic(topic1, numPartitions, brokerCount)
     leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
     followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
     quotaTestClients = createQuotaTestClients(topic1, leaderNode)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 1a1f37e..38650b4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -15,12 +15,10 @@ package kafka.api
 
 import java.time
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantLock
 import java.util.{Collection, Collections, Properties}
 
 import util.control.Breaks._
-import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -29,25 +27,23 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
 import org.junit.Assert._
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Ignore, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future => SFuture}
+import scala.collection.mutable
 
 /**
  * Integration tests for the consumer that cover basic usage as well as server failures
  */
-class ConsumerBounceTest extends BaseRequestTest with Logging {
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
+class ConsumerBounceTest extends BaseConsumerTest with Logging {
   val maxGroupSize = 5
 
   // Time to process commit and leave group requests in tests when brokers are available
-  val gracefulCloseTimeMs = 1000
-  val executor = Executors.newScheduledThreadPool(2)
+  val gracefulCloseTimeMs = Some(1000L)
+  val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
+  val consumerPollers: mutable.Buffer[ConsumerAssignmentPoller] = mutable.Buffer[ConsumerAssignmentPoller]()
+
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
 
   override def generateConfigs = {
     generateKafkaConfigs()
@@ -63,21 +59,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
 
-    FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
       .map(KafkaConfig.fromProps(_, properties))
   }
 
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    // create the test topic with all the brokers as replicas
-    createTopic(topic, 1, numBrokers)
-  }
-
   @After
   override def tearDown() {
     try {
+      consumerPollers.foreach(_.shutdown())
       executor.shutdownNow()
       // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
       assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
@@ -176,7 +165,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     val consumer = createConsumer()
     consumer.subscribe(Collections.singleton(newtopic))
     executor.schedule(new Runnable {
-        def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers)
+        def run() = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount)
       }, 2, TimeUnit.SECONDS)
     consumer.poll(time.Duration.ZERO)
 
@@ -201,18 +190,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       assertEquals(0, remainingRecords)
     }
 
+    val poller = new ConsumerAssignmentPoller(consumer, List(newtopic))
+    consumerPollers += poller
+    poller.start()
     sendRecords(numRecords, newtopic)
-    receiveRecords(consumer, numRecords, 10000)
+    receiveExactRecords(poller, numRecords, 10000)
+    poller.shutdown()
 
     servers.foreach(server => killBroker(server.config.brokerId))
     Thread.sleep(500)
     restartDeadBrokers()
 
-    val future = executor.submit(new Runnable {
-      def run() = receiveRecords(consumer, numRecords, 10000)
-    })
+    val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic))
+    consumerPollers += poller2
+    poller2.start()
     sendRecords(numRecords, newtopic)
-    future.get
+    receiveExactRecords(poller, numRecords, 10000L)
   }
 
   @Test
@@ -233,7 +226,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
    */
   private def checkCloseGoodPath(numRecords: Int, groupId: String) {
     val consumer = createConsumerAndReceive(groupId, false, numRecords)
-    val future = submitCloseAndValidate(consumer, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+    val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs)
     future.get
     checkClosedState(groupId, numRecords)
   }
@@ -251,8 +244,8 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     killBroker(findCoordinator(dynamicGroup))
     killBroker(findCoordinator(manualGroup))
 
-    val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
-    val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+    val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
+    val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs)
     future1.get
     future2.get
 
@@ -302,6 +295,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     */
   @Test
   def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
+    val group = "group-max-size-test"
     val topic = "group-max-size-test"
     val maxGroupSize = 2
     val consumerCount = maxGroupSize + 1
@@ -311,77 +305,51 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       // ensure even record distribution per partition
       recordsProduced += partitionCount - recordsProduced % partitionCount
     }
-    val executor = Executors.newScheduledThreadPool(consumerCount * 2)
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    val producer = createProducer()
-    createTopic(topic, numPartitions = partitionCount, replicationFactor = numBrokers)
-    val stableConsumers = createConsumersWithGroupId("group2", consumerCount, executor, topic = topic)
+    val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)
 
-    // assert group is stable and working
-    sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
-    stableConsumers.foreach { cons => {
-      receiveAndCommit(cons, recordsProduced / consumerCount, 10000)
-    }}
+    addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
+      consumerPollers, List[String](topic), partitions, group)
 
     // roll all brokers with a lesser max group size to make sure coordinator has the new config
     val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
-    val kickedConsumerOut = new AtomicBoolean(false)
     var kickedOutConsumerIdx: Option[Int] = None
-    val lock = new ReentrantLock
     // restart brokers until the group moves to a Coordinator with the new config
     breakable { for (broker <- servers.indices) {
       killBroker(broker)
-      sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
-
-      var successfulConsumes = 0
-
-      // compute consumptions in a non-blocking way in order to account for the rebalance once the group.size takes effect
-      val consumeFutures = new ArrayBuffer[SFuture[Any]]
-      implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
-      stableConsumers.indices.foreach(idx => {
-        val currentConsumer = stableConsumers(idx)
-        val consumeFuture = SFuture {
-          try {
-            receiveAndCommit(currentConsumer, recordsProduced / consumerCount, 10000)
-            CoreUtils.inLock(lock) { successfulConsumes += 1 }
-          } catch {
-            case e: Throwable =>
-              if (!e.isInstanceOf[GroupMaxSizeReachedException]) {
-                throw e
-              }
-              if (!kickedConsumerOut.compareAndSet(false, true)) {
-                fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
-              }
-              kickedOutConsumerIdx = Some(idx)
-          }
+      consumerPollers.indices.foreach(idx => {
+        consumerPollers(idx).thrownException match {
+          case Some(thrownException) =>
+            if (!thrownException.isInstanceOf[GroupMaxSizeReachedException]) {
+              throw thrownException
+            }
+            if (kickedOutConsumerIdx.isDefined) {
+              fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
+            }
+            kickedOutConsumerIdx = Some(idx)
+          case None =>
         }
-
-        consumeFutures += consumeFuture
       })
-      Await.result(SFuture.sequence(consumeFutures), Duration("12sec"))
 
-      if (kickedConsumerOut.get()) {
-        // validate the rest N-1 consumers consumed successfully
-        assertEquals(maxGroupSize, successfulConsumes)
+      if (kickedOutConsumerIdx.isDefined)
         break
-      }
 
       val config = newConfigs(broker)
       servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
       restartDeadBrokers()
     }}
-    if (!kickedConsumerOut.get())
+    if (kickedOutConsumerIdx.isEmpty)
       fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll")
+    restartDeadBrokers()
 
     // assert that the group has gone through a rebalance and shed off one consumer
-    stableConsumers.remove(kickedOutConsumerIdx.get)
-    sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount))
-    // should be only maxGroupSize consumers left in the group
-    stableConsumers.foreach { cons => {
-      receiveAndCommit(cons, recordsProduced / maxGroupSize, 10000)
-    }}
+    consumerPollers.remove(kickedOutConsumerIdx.get).shutdown()
+    sendRecords(createProducer(), recordsProduced, topic, numPartitions = Some(partitionCount))
+    TestUtils.waitUntilTrue(() => {
+      consumerPollers.forall(p => p.receivedMessages >= recordsProduced / consumerCount)
+    }, "The remaining consumers in the group could not fetch the expected records", 10000L)
   }
 
   /**
@@ -389,70 +357,30 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     */
   @Test
   def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
-    val topic = "group-max-size-test"
-    val groupId = "group1"
-    val executor = Executors.newScheduledThreadPool(maxGroupSize * 2)
-    createTopic(topic, maxGroupSize, numBrokers)
+    val group = "fatal-exception-test"
+    val topic = "fatal-exception-test"
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
 
+    val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
+
     // Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group
-    val stableConsumers = createConsumersWithGroupId(groupId, maxGroupSize, executor, topic)
-    val newConsumer = createConsumerWithGroupId(groupId)
-    var failedRebalance = false
-    var exception: Exception = null
-    waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, onException = e => {failedRebalance = true; exception = e}),
-      executor = executor, stableConsumers:_*)
-    assertTrue("Rebalance did not fail as expected", failedRebalance)
-    assertTrue(exception.isInstanceOf[GroupMaxSizeReachedException])
+    addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
+      consumerPollers, List[String](topic), partitions, group)
+    val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
+      mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
+    val rejectedConsumer = rejectedConsumerPollers.head
+    TestUtils.waitUntilTrue(() => {
+      rejectedConsumer.thrownException.isDefined
+    }, "Extra consumer did not throw an exception")
+    assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
 
     // assert group continues to live
-    val producer = createProducer()
-    sendRecords(producer, maxGroupSize * 100, topic, numPartitions = Some(maxGroupSize))
-    stableConsumers.foreach { cons => {
-        receiveExactRecords(cons, 100, 10000)
-    }}
-  }
-
-  /**
-    * Creates N consumers with the same group ID and ensures the group rebalances properly at each step
-    */
-  private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
-    val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-    for (_ <- 1.to(consumerCount)) {
-      val newConsumer = createConsumerWithGroupId(groupId)
-      waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, topic = topic),
-        executor = executor, stableConsumers:_*)
-      stableConsumers += newConsumer
-    }
-    stableConsumers
-  }
-
-  def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], executor: ExecutorService, revokeSemaphore: Option[Semaphore] = None,
-                       onException: Exception => Unit = e => { throw e }, topic: String = topic, pollTimeout: Int = 1000): Future[Any] = {
-    executor.submit(CoreUtils.runnable {
-      try {
-        consumer.subscribe(Collections.singletonList(topic))
-        consumer.poll(java.time.Duration.ofMillis(pollTimeout))
-      } catch {
-        case e: Exception => onException.apply(e)
-      }
-    }, 0)
-  }
-
-  def waitForRebalance(timeoutMs: Long, future: Future[Any], executor: ExecutorService, otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) {
-    val startMs = System.currentTimeMillis
-    implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
-
-    while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) {
-      val consumeFutures = otherConsumers.map(consumer => SFuture {
-        consumer.poll(time.Duration.ofMillis(1000))
-      })
-      Await.result(SFuture.sequence(consumeFutures), Duration("1500ms"))
-    }
-
-    assertTrue("Rebalance did not complete in time", future.isDone)
+    sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
+    TestUtils.waitUntilTrue(() => {
+      consumerPollers.forall(p => p.receivedMessages >= 100)
+    }, "The consumers in the group could not fetch the expected records", 10000L)
   }
 
   /**
@@ -463,7 +391,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
   @Test
   def testCloseDuringRebalance() {
     val topic = "closetest"
-    createTopic(topic, 10, numBrokers)
+    createTopic(topic, 10, brokerCount)
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -510,7 +438,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     val rebalanceFuture = createConsumerToRebalance()
 
     // consumer1 should leave group and close immediately even though rebalance is in progress
-    val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+    val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
 
     // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group
     waitForRebalance(2000, rebalanceFuture, consumer2)
@@ -528,40 +456,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     closeFuture2.get(2000, TimeUnit.MILLISECONDS)
   }
 
-  private def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    createConsumer()
-  }
-
   private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = {
     val consumer = createConsumerWithGroupId(groupId)
-    if (manualAssign)
-      consumer.assign(Collections.singleton(tp))
-    else
-      consumer.subscribe(Collections.singleton(topic))
-    receiveExactRecords(consumer, numRecords)
-    consumer
-  }
-
-  private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Long = {
-    var received = 0L
-    val endTimeMs = System.currentTimeMillis + timeoutMs
-    while (received < numRecords && System.currentTimeMillis < endTimeMs)
-      received += consumer.poll(time.Duration.ofMillis(100)).count()
-
-    received
-  }
+    val consumerPoller = if (manualAssign)
+        subscribeConsumerAndStartPolling(consumer, List(), Set(tp))
+      else
+        subscribeConsumerAndStartPolling(consumer, List(topic))
 
-  private def receiveExactRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Unit = {
-    val received = receiveRecords(consumer, numRecords, timeoutMs)
-    assertEquals(numRecords, received)
+    receiveExactRecords(consumerPoller, numRecords)
+    consumerPoller.shutdown()
+    consumer
   }
 
-  @throws(classOf[CommitFailedException])
-  private def receiveAndCommit(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long): Unit = {
-    val received = receiveRecords(consumer, numRecords, timeoutMs)
-    assertTrue(s"Received $received, expected at least $numRecords", numRecords <= received)
-    consumer.commitSync()
+  private def receiveExactRecords(consumer: ConsumerAssignmentPoller, numRecords: Int, timeoutMs: Long = 60000): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      consumer.receivedMessages == numRecords
+    }, s"Consumer did not receive expected $numRecords. It received ${consumer.receivedMessages}", timeoutMs)
   }
 
   private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@@ -617,6 +527,12 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     }
   }
 
+  private def createTopicPartitions(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
+                                    topicConfig: Properties = new Properties): Set[TopicPartition] = {
+    createTopic(topic, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig)
+    Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet
+  }
+
   private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
                           numRecords: Int,
                           topic: String = this.topic,
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 8c1d34d..1394c83 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -48,7 +48,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
   override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
 
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-  override val serverCount: Int = 2
+  override val brokerCount: Int = 2
 
   private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -161,7 +161,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
     user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
     user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
 
-    assertEquals(serverCount, callbackInstances.get)
+    assertEquals(brokerCount, callbackInstances.get)
   }
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 78fc215..57fd552 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -30,7 +30,7 @@ import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
 
 class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
-  override val serverCount = 1
+  override val brokerCount = 1
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
 
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 0587a6d..49977d0 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -58,7 +58,7 @@ import scala.collection.JavaConverters._
   * would end up with ZooKeeperTestHarness twice.
   */
 abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
-  override val serverCount = 3
+  override val brokerCount = 3
 
   override def configureSecurityBeforeServersStart() {
     AclCommand.main(clusterActionArgs)
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 27c3f31..6d2161d 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -33,9 +33,9 @@ class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
   override val kafkaPrincipalType = GroupPrincipalType
   override def userPrincipal = TestGroupPrincipal
 
-  override def propertyOverrides(properties: Properties): Unit = {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
       classOf[GroupPrincipalBuilder].getName)
-    super.propertyOverrides(properties)
+    super.brokerPropertyOverrides(properties)
   }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 39f47ab..5ffbc43 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -37,7 +37,7 @@ import scala.collection.mutable
  * A helper class for writing integration tests that involve producers, consumers, and servers
  */
 abstract class IntegrationTestHarness extends KafkaServerTestHarness {
-  protected def serverCount: Int
+  protected def brokerCount: Int
   protected def logDirCount: Int = 1
 
   val producerConfig = new Properties
@@ -49,10 +49,20 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   protected def interBrokerListenerName: ListenerName = listenerName
 
+  protected def modifyConfigs(props: Seq[Properties]): Unit = {
+    configureListeners(props)
+    props.foreach(_ ++= serverConfig)
+  }
+
   override def generateConfigs: Seq[KafkaConfig] = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
-    cfgs.foreach { config =>
+    modifyConfigs(cfgs)
+    cfgs.map(KafkaConfig.fromProps)
+  }
+
+  protected def configureListeners(props: Seq[Properties]): Unit = {
+    props.foreach { config =>
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
       config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value)
 
@@ -63,8 +73,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
       config.setProperty(KafkaConfig.ListenersProp, listeners)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap)
     }
-    cfgs.foreach(_ ++= serverConfig)
-    cfgs.map(KafkaConfig.fromProps)
   }
 
   @Before
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 08a0224..b30b363 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -41,7 +41,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
 
   val producerCount = 1
   val consumerCount = 2
-  val serverCount = 3
+  val brokerCount = 3
   val groupId = "my-test"
   val clientId = "consumer-498"
 
@@ -70,7 +70,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
   override def setUp() {
     super.setUp()
     client = AdminClient.createSimplePlaintext(this.brokerList)
-    createTopic(topic, 2, serverCount)
+    createTopic(topic, 2, brokerCount)
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 795f954..8d2e66e 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -33,7 +33,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
 class LogAppendTimeTest extends IntegrationTestHarness {
   val producerCount: Int = 1
   val consumerCount: Int = 1
-  val serverCount: Int = 2
+  val brokerCount: Int = 2
 
   // This will be used for the offsets topic as well
   serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index b2f5a7e..da466b8 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 
 class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
-  override val serverCount = 1
+  override val brokerCount = 1
 
   override protected def listenerName = new ListenerName("CLIENT")
   private val kafkaClientSaslMechanism = "PLAIN"
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c11fc12..e3251a5 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
 import java.util.{Collections, Locale, Optional, Properties}
 
 import kafka.log.LogConfig
-import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@@ -38,6 +37,8 @@ import scala.collection.mutable.Buffer
 import kafka.server.QuotaType
 import kafka.server.KafkaServer
 
+import scala.collection.mutable
+
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
 
@@ -342,17 +343,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(producer, numRecords, tp)
 
     val topic1 = "tblablac" // matches subscribed pattern
-    createTopic(topic1, 2, serverCount)
+    createTopic(topic1, 2, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
 
     val topic2 = "tblablak" // does not match subscribed pattern
-    createTopic(topic2, 2, serverCount)
+    createTopic(topic2, 2, brokerCount)
     sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
 
     val topic3 = "tblab1" // does not match subscribed pattern
-    createTopic(topic3, 2, serverCount)
+    createTopic(topic3, 2, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
 
@@ -370,7 +371,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     awaitAssignment(consumer, assignment)
 
     val topic4 = "tsomec" // matches subscribed pattern
-    createTopic(topic4, 2, serverCount)
+    createTopic(topic4, 2, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
 
@@ -404,7 +405,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // the first topic ('topic')  matches first subscription pattern only
 
     val fooTopic = "foo" // matches both subscription patterns
-    createTopic(fooTopic, 1, serverCount)
+    createTopic(fooTopic, 1, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
 
     assertEquals(0, consumer.assignment().size)
@@ -419,7 +420,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     awaitAssignment(consumer, assignment)
 
     val barTopic = "bar" // matches the next subscription pattern
-    createTopic(barTopic, 1, serverCount)
+    createTopic(barTopic, 1, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
 
     val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
@@ -450,7 +451,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(producer, numRecords, tp)
 
     val topic1 = "tblablac" // matches the subscription pattern
-    createTopic(topic1, 2, serverCount)
+    createTopic(topic1, 2, brokerCount)
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
     sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
 
@@ -517,7 +518,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.subscribe(List(topic).asJava)
     awaitAssignment(consumer, initialAssignment)
 
-    createTopic(otherTopic, 2, serverCount)
+    createTopic(otherTopic, 2, brokerCount)
     val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     consumer.subscribe(List(topic, otherTopic).asJava)
     awaitAssignment(consumer, expandedAssignment)
@@ -526,7 +527,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testShrinkingTopicSubscriptions() {
     val otherTopic = "other"
-    createTopic(otherTopic, 2, serverCount)
+    createTopic(otherTopic, 2, brokerCount)
     val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val consumer = createConsumer()
     consumer.subscribe(List(topic, otherTopic).asJava)
@@ -781,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val partitionCount = 30
     val topics = Seq(topic1, topic2, topic3)
     topics.foreach { topicName =>
-      createTopic(topicName, partitionCount, serverCount)
+      createTopic(topicName, partitionCount, brokerCount)
     }
 
     val partitions = topics.flatMap { topic =>
@@ -861,11 +862,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // for the topic partition assignment
     val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
     try {
-      validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+      validateGroupAssignment(consumerPollers, subscriptions)
 
       // add one more consumer and validate re-assignment
       addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers,
-        List(topic1, topic2), subscriptions)
+        List(topic1, topic2), subscriptions, "roundrobin-group")
     } finally {
       consumerPollers.foreach(_.shutdown())
     }
@@ -900,11 +901,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // create a group of consumers, subscribe the consumers to the single topic and start polling
     // for the topic partition assignment
     val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
-    validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}")
+    validateGroupAssignment(consumerPollers, partitions)
     val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
 
     // add one more consumer and validate re-assignment
-    addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions)
+    addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions, "sticky-group")
 
     val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
     val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
@@ -945,7 +946,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1))
     try {
-      validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+      validateGroupAssignment(consumerPollers, subscriptions)
 
       // add 2 more consumers and validate re-assignment
       addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions)
@@ -1040,7 +1041,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoCommitIntercept() {
     val topic2 = "topic2"
-    createTopic(topic2, 2, serverCount)
+    createTopic(topic2, 2, brokerCount)
 
     // produce records
     val numRecords = 100
@@ -1336,7 +1337,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoCommitOnRebalance() {
     val topic2 = "topic2"
-    createTopic(topic2, 2, serverCount)
+    createTopic(topic2, 2, brokerCount)
 
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer = createConsumer()
@@ -1376,7 +1377,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
     val numMessages = 1000
     val topic2 = "topic2"
-    createTopic(topic2, 2, serverCount)
+    createTopic(topic2, 2, brokerCount)
     // send some messages.
     val producer = createProducer()
     sendRecords(producer, numMessages, tp)
@@ -1415,7 +1416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testPerPartitionLagMetricsCleanUpWithSubscribe() {
     val numMessages = 1000
     val topic2 = "topic2"
-    createTopic(topic2, 2, serverCount)
+    createTopic(topic2, 2, brokerCount)
     // send some messages.
     val producer = createProducer()
     sendRecords(producer, numMessages, tp)
@@ -1635,16 +1636,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumerPollers += timeoutPoller
 
     // validate the initial assignment
-    validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+    validateGroupAssignment(consumerPollers, subscriptions)
 
     // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
     timeoutPoller.shutdown()
     if (closeConsumer)
       timeoutConsumer.close()
 
-    val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
     validateGroupAssignment(consumerPollers, subscriptions,
-      s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout)
+      Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left"), 3 * groupMaxSessionTimeoutMs)
 
     // done with pollers and consumers
     for (poller <- consumerPollers)
@@ -1652,6 +1652,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   /**
+    * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to
+    * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
+    *
+    * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
+    *
+    * @param consumerGroup consumer group
+    * @param topicsToSubscribe topics to which consumers will subscribe to
+    * @return collection of consumer pollers
+    */
+  def subscribeConsumers(consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
+                         topicsToSubscribe: List[String]): mutable.Buffer[ConsumerAssignmentPoller] = {
+    val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]()
+    for (consumer <- consumerGroup)
+      consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+    consumerPollers
+  }
+
+  /**
    * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
    * records to each partition
    */
@@ -1659,7 +1677,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                 topicName: String,
                                 numPartitions: Int,
                                 recordsPerPartition: Int): Set[TopicPartition] = {
-    createTopic(topicName, numPartitions, serverCount)
+    createTopic(topicName, numPartitions, brokerCount)
     var parts = Set[TopicPartition]()
     for (partition <- 0 until numPartitions) {
       val tp = new TopicPartition(topicName, partition)
@@ -1670,51 +1688,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   /**
-   * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
-   * consumer poller and starts polling.
-   * Assumes that the consumer is not subscribed to any topics yet
-   *
-   * @param consumer consumer
-   * @param topicsToSubscribe topics that this consumer will subscribe to
-   * @return consumer poller for the given consumer
-   */
-  def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
-                                       topicsToSubscribe: List[String]): ConsumerAssignmentPoller = {
-    assertEquals(0, consumer.assignment().size)
-    val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
-    consumerPoller.start()
-    consumerPoller
-  }
-
-  /**
-   * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to
-   * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
-   *
-   * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
-   *
-   * @param consumerGroup consumer group
-   * @param topicsToSubscribe topics to which consumers will subscribe to
-   * @return collection of consumer pollers
-   */
-  def subscribeConsumers(consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
-                         topicsToSubscribe: List[String]): Buffer[ConsumerAssignmentPoller] = {
-    val consumerPollers = Buffer[ConsumerAssignmentPoller]()
-    for (consumer <- consumerGroup)
-      consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
-    consumerPollers
-  }
-
-  /**
-   * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to
-   * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
-   *
-   * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
-   *
-   * @param consumerCount number of consumers to create
-   * @param topicsToSubscribe topics to which consumers will subscribe to
-   * @param subscriptions set of all topic partitions
-   * @return collection of created consumers and collection of corresponding consumer pollers
-   */
+    * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to
+    * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
+    *
+    * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller.
+    *
+    * @param consumerCount number of consumers to create
+    * @param topicsToSubscribe topics to which consumers will subscribe to
+    * @param subscriptions set of all topic partitions
+    * @return collection of created consumers and collection of corresponding consumer pollers
+    */
   def createConsumerGroupAndWaitForAssignment(consumerCount: Int,
                                               topicsToSubscribe: List[String],
                                               subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
@@ -1728,54 +1711,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     (consumerGroup, consumerPollers)
   }
 
-  /**
-   * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
-   * pollers for these consumers. Wait for partition re-assignment and validate.
-   *
-   * Currently, assignment validation requires that total number of partitions is greater or equal to
-   * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
-   *
-   * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
-   * @param consumerGroup current consumer group
-   * @param consumerPollers current consumer pollers
-   * @param topicsToSubscribe topics to which new consumers will subscribe to
-   * @param subscriptions set of all topic partitions
-   */
-  def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
-                                                   consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
-                                                   consumerPollers: Buffer[ConsumerAssignmentPoller],
-                                                   topicsToSubscribe: List[String],
-                                                   subscriptions: Set[TopicPartition]): Unit = {
-    assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
-    for (_ <- 0 until numOfConsumersToAdd) {
-      val consumer = createConsumer()
-      consumerGroup += consumer
-      consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
-    }
-
-    // wait until topics get re-assigned and validate assignment
-    validateGroupAssignment(consumerPollers, subscriptions,
-      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added $numOfConsumersToAdd consumer(s)")
-  }
-
-  /**
-   * Wait for consumers to get partition assignment and validate it.
-   *
-   * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
-   * @param subscriptions set of all topic partitions
-   * @param msg message to print when waiting for/validating assignment fails
-   */
-  def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
-                              subscriptions: Set[TopicPartition],
-                              msg: String,
-                              waitTime: Long = 10000L): Unit = {
-    TestUtils.waitUntilTrue(() => {
-      val assignments = Buffer[Set[TopicPartition]]()
-      consumerPollers.foreach(assignments += _.consumerAssignment())
-      isPartitionAssignmentValid(assignments, subscriptions)
-    }, msg, waitTime)
-  }
-
   def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
                                                            topicsToSubscribe: List[String],
                                                            subscriptions: Set[TopicPartition]): Unit = {
@@ -1785,11 +1720,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // since subscribe call to poller does not actually call consumer subscribe right away, wait
     // until subscribe is called on all consumers
     TestUtils.waitUntilTrue(() => {
-      consumerPollers forall (poller => poller.isSubscribeRequestProcessed())
-    }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L)
+      consumerPollers.forall { poller => poller.isSubscribeRequestProcessed }
+    }, s"Failed to call subscribe on all consumers in the group for subscription $subscriptions", 1000L)
 
     validateGroupAssignment(consumerPollers, subscriptions,
-      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
+      Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription"))
   }
 
   def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index c353b52..bbb3d19 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -39,7 +39,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
   val consumerCount = 1
   val producerCount = 1
-  val serverCount = 1
+  val brokerCount = 1
 
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
   this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
@@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KafkaServerContextName))
     super.setUp()
-    createTopic(topic, numPartitions, serverCount)
+    createTopic(topic, numPartitions, brokerCount)
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index c15a508..59c6d34 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -13,7 +13,7 @@
 package kafka.api
 
 import java.io.File
-import java.util.Locale
+import java.util.{Locale, Properties}
 
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
@@ -47,6 +47,11 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
     closeSasl()
   }
 
+  override def modifyConfigs(props: Seq[Properties]): Unit = {
+    super.modifyConfigs(props)
+    configureListeners(props)
+  }
+
   /**
    * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
    * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index 41e1ff9..8686575 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
 
 class DynamicConnectionQuotaTest extends BaseRequestTest {
 
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   val topic = "test"
   val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
@@ -49,7 +49,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   @Before
   override def setUp(): Unit = {
     super.setUp()
-    TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
+    TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers)
   }
 
   @After
@@ -64,8 +64,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  override protected def propertyOverrides(properties: Properties): Unit = {
-    super.propertyOverrides(properties)
+  override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 07885cd..8842171 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -39,7 +39,7 @@ import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
 
 class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
-  override val serverCount = 1
+  override val brokerCount = 1
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
 
   private val kafkaClientSaslMechanism = "GSSAPI"
@@ -70,7 +70,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000")
 
     // create the test topic with all the brokers as replicas
-    createTopic(topic, 2, serverCount)
+    createTopic(topic, 2, brokerCount)
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
index 0f40650..4c2c640 100644
--- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters._
  */
 class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
 
-  override val serverCount = 1
+  override val brokerCount = 1
 
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
   private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 3128346..a5f1bab 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 
 class AddPartitionsTest extends BaseRequestTest {
 
-  protected override def numBrokers: Int = 4
+  override def brokerCount: Int = 4
 
   val partitionId = 0
 
diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
index a4c27e5..0c9f4d3 100644
--- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala
@@ -39,7 +39,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
   var adminClient: org.apache.kafka.clients.admin.AdminClient = null
 
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   @Before
   override def setUp(): Unit = {
@@ -48,11 +48,11 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
   }
 
   override def generateConfigs = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+    val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
       enableControlledShutdown = false,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
-    props.foreach(propertyOverrides)
+    props.foreach(brokerPropertyOverrides)
     props.map(KafkaConfig.fromProps)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index d6262b3..a54e5fc 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 
 class AbstractCreateTopicsRequestTest extends BaseRequestTest {
 
-  override def propertyOverrides(properties: Properties): Unit =
+  override def brokerPropertyOverrides(properties: Properties): Unit =
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
 
   def topicsReq(topics: Seq[CreatableTopic],
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
index 9071f95..d15409c 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
@@ -31,7 +31,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
   private val topic1 = "foobartopic"
   val numPartitions = 3
 
-  override def propertyOverrides(properties: Properties): Unit =
+  override def brokerPropertyOverrides(properties: Properties): Unit =
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
 
   @Before
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index 28ed81d..d61c14c 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -33,7 +33,7 @@ import scala.util.Random
 
 class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
   override val logDirCount = 5
-  override val numBrokers = 1
+  override val brokerCount = 1
 
   val topic = "topic"
 
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 83f7111..ab70aec 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -40,7 +40,7 @@ object ApiVersionsRequestTest {
 
 class ApiVersionsRequestTest extends BaseRequestTest {
 
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
 
   @Test
   def testApiVersionsRequest() {
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 09ffe4f..a92aeaf 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -24,7 +24,6 @@ import java.util.Properties
 
 import kafka.api.IntegrationTestHarness
 import kafka.network.SocketServer
-import kafka.utils._
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.ApiKeys
@@ -32,22 +31,19 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestRespons
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
 abstract class BaseRequestTest extends IntegrationTestHarness {
-  override val serverCount: Int = numBrokers
   private var correlationId = 0
 
   // If required, set number of brokers
-  protected def numBrokers: Int = 3
+  override def brokerCount: Int = 3
 
   // If required, override properties by mutating the passed Properties object
-  protected def propertyOverrides(properties: Properties) {}
-
-  override def generateConfigs = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
-      enableControlledShutdown = false,
-      interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
-    props.foreach(propertyOverrides)
-    props.map(KafkaConfig.fromProps)
+  protected def brokerPropertyOverrides(properties: Properties) {}
+
+  override def modifyConfigs(props: Seq[Properties]): Unit = {
+    props.foreach { p =>
+      p.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+      brokerPropertyOverrides(p)
+    }
   }
 
   def anySocketServer = {
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index db2028c..709b3c9 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -55,7 +55,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
     validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))),
       Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
     validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
-      replicationFactor = numBrokers + 1))),
+      replicationFactor = brokerCount + 1))),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
     validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
       config=Map("not.a.property" -> "error")))),
@@ -71,7 +71,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
     validateErrorCreateTopicsRequests(topicsReq(Seq(
       topicReq(existingTopic),
       topicReq("partial-partitions", numPartitions = -1),
-      topicReq("partial-replication", replicationFactor=numBrokers + 1),
+      topicReq("partial-replication", replicationFactor=brokerCount + 1),
       topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))),
       topicReq("partial-none"))),
       Map(
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 4fc3244..0395484 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -32,8 +32,8 @@ import scala.collection.JavaConverters._
 class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest {
   import CreateTopicsRequestWithPolicyTest._
 
-  override def propertyOverrides(properties: Properties): Unit = {
-    super.propertyOverrides(properties)
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
     properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName)
   }
 
@@ -94,7 +94,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
         Some("Topic 'existing-topic' already exists."))))
 
     validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
-      numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true),
+      numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
         Some("Replication factor: 4 larger than available brokers: 3."))))
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index 6d56a02..0a7e194 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -29,7 +29,7 @@ import scala.concurrent.ExecutionException
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
   var adminClient: AdminClient = null
 
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   @Before
   override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index 6f79a9a..ae65016 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -38,7 +38,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
   var adminClient: AdminClient = null
 
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   @Before
   override def setUp(): Unit = {
@@ -47,11 +47,11 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
   }
 
   override def generateConfigs = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+    val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
       enableControlledShutdown = false,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
-    props.foreach(propertyOverrides)
+    props.foreach(brokerPropertyOverrides)
     props.map(KafkaConfig.fromProps)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index 1203f83..7de624f 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -36,7 +36,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
   var adminClient: AdminClient = null
 
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   @Before
   override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index 7240d77..6a01165 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -29,14 +29,14 @@ import java.util.Collections
 
 class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
 
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
 
   override def generateConfigs = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+    val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = false,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
-    props.foreach(propertyOverrides)
+    props.foreach(brokerPropertyOverrides)
     props.map(KafkaConfig.fromProps)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 5a0244b..a1e4c73 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -27,7 +27,7 @@ import java.io.File
 
 class DescribeLogDirsRequestTest extends BaseRequestTest {
   override val logDirCount = 2
-  override val numBrokers: Int = 1
+  override val brokerCount: Int = 1
 
   val topic = "topic"
   val partitionNum = 2
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 673abe6..4ae7d93 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -32,7 +32,7 @@ import org.junit.Test
 
 class FetchRequestDownConversionConfigTest extends BaseRequestTest {
   private var producer: KafkaProducer[String, String] = null
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
 
   override def setUp(): Unit = {
     super.setUp()
@@ -45,8 +45,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
     super.tearDown()
   }
 
-  override protected def propertyOverrides(properties: Properties): Unit = {
-    super.propertyOverrides(properties)
+  override protected def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
     properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
index 30f3b23..f3580cf 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger
  */
 class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
 
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
 
-  override def propertyOverrides(properties: Properties): Unit = {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index d56a9f0..3eff38f 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -41,7 +41,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
   val producerCount: Int = 1
   val consumerCount: Int = 1
-  val serverCount: Int = 2
+  val brokerCount: Int = 2
   private val topic = "topic"
   private val partitionNum = 12
   override val logDirCount = 3
@@ -52,7 +52,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    createTopic(topic, partitionNum, serverCount)
+    createTopic(topic, partitionNum, brokerCount)
   }
 
   @Test
@@ -71,7 +71,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
     var server: KafkaServer = null
     try {
-      val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3)
+      val props = TestUtils.createBrokerConfig(brokerCount, zkConnect, logDirCount = 3)
       props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
       props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
       val kafkaConfig = KafkaConfig.fromProps(props)
@@ -118,7 +118,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // has fetched from the leader and attempts to append to the offline replica.
     producer.send(record).get
 
-    assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
+    assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
     followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
       assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
     }
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 04b3467..7b52e7b 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -38,11 +38,11 @@ class LogOffsetTest extends BaseRequestTest {
 
   private lazy val time = new MockTime
 
-  protected override def numBrokers = 1
+  override def brokerCount = 1
 
   protected override def brokerTime(brokerId: Int) = time
 
-  protected override def propertyOverrides(props: Properties): Unit = {
+  protected override def brokerPropertyOverrides(props: Properties): Unit = {
     props.put("log.flush.interval.messages", "1")
     props.put("num.partitions", "20")
     props.put("log.retention.hours", "10")
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index bde16b6..f920d94 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -34,7 +34,7 @@ import scala.collection.JavaConverters._
 
 class MetadataRequestTest extends BaseRequestTest {
 
-  override def propertyOverrides(properties: Properties) {
+  override def brokerPropertyOverrides(properties: Properties) {
     properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2")
     properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
   }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 671f9e0..d04f39f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -49,7 +49,7 @@ import scala.collection.mutable.ListBuffer
 
 class RequestQuotaTest extends BaseRequestTest {
 
-  override def numBrokers: Int = 1
+  override def brokerCount: Int = 1
 
   private val topic = "topic-1"
   private val numPartitions = 1
@@ -66,7 +66,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private val executor = Executors.newCachedThreadPool
   private val tasks = new ListBuffer[Task]
 
-  override def propertyOverrides(properties: Properties): Unit = {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 185a2f4..7dcc96b 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -36,7 +36,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
   private val kafkaServerSaslMechanisms = List("PLAIN")
   protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
   protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  override def numBrokers = 1
+  override def brokerCount = 1
 
   @Before
   override def setUp(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
index 2d0f1db..f246b25 100644
--- a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
@@ -28,7 +28,7 @@ import collection.JavaConverters._
 
 class StopReplicaRequestTest extends BaseRequestTest {
   override val logDirCount = 2
-  override val numBrokers: Int = 1
+  override val brokerCount: Int = 1
 
   val topic = "topic"
   val partitionNum = 2