You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 12:32:31 UTC

kafka git commit: MINOR: Create offsets topic explicitly in DescribeConsumerGroupTest

Repository: kafka
Updated Branches:
  refs/heads/trunk 17b2bde4b -> 313f8d7dd


MINOR: Create offsets topic explicitly in DescribeConsumerGroupTest

This should fix transient failures due to timeouts caused by slow
auto creation of the offsets topic. The one exception is
`testDescribeGroupWithNewConsumerWithShortInitializationTimeout` where
we want initialisation to take longer than the timeout so we let
it be auto created. That test has also been a bit flaky and I reduced
the timeout to 1 ms.

Also:
- Simplified resource handling
- Removed usage of EasyMock that didn't make sense.
- `findCoordinator` should retry if `sendAnyNode` throws an exception

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #3237 from ijuma/kafka-49480-describe-consumer-group-test-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/313f8d7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/313f8d7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/313f8d7d

Branch: refs/heads/trunk
Commit: 313f8d7ddc48943d9f2290a27751d7a0fa1d3a82
Parents: 17b2bde
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jun 6 13:31:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 13:31:46 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    |  36 ++-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   5 -
 .../kafka/admin/DescribeConsumerGroupTest.scala | 284 ++++++++-----------
 .../kafka/admin/ReplicationQuotaUtils.scala     |   4 +-
 .../kafka/server/ReplicationQuotasTest.scala    |   4 +-
 ...rivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../epoch/LeaderEpochIntegrationTest.scala      |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  28 +-
 8 files changed, 174 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4410e94..7bd626a 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,14 +16,14 @@ import java.io.IOException
 import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
+import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
+
 import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
 import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
-
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{RequestFutureAdapter, ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, RequestFutureAdapter}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.TimeoutException
@@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -104,20 +104,32 @@ class AdminClient(val time: Time,
   }
 
   def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
+    val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+
+    def sendRequest: Try[FindCoordinatorResponse] =
+      Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])
+
     val startTime = time.milliseconds
-    val requestBuilder = new FindCoordinatorRequest.Builder(org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
-    var response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
+    var response = sendRequest
+
+    while ((response.isFailure || response.get.error == Errors.COORDINATOR_NOT_AVAILABLE) &&
+      (time.milliseconds - startTime < timeoutMs)) {
 
-    while (response.error == Errors.COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
       Thread.sleep(retryBackoffMs)
-      response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
+      response = sendRequest
     }
 
-    if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
-      throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
+    def timeoutException(cause: Throwable) =
+      throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", cause)
 
-    response.error.maybeThrow()
-    response.node
+    response match {
+      case Failure(exception) => throw timeoutException(exception)
+      case Success(response) =>
+        if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
+          throw timeoutException(response.error.exception)
+        response.error.maybeThrow()
+        response.node
+    }
   }
 
   def listGroups(node: Node): List[GroupOverview] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index d8bc65e..8a26cac 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -187,11 +187,6 @@ object ReplicationQuotasTestRig {
       println(s"Worst case duration is ${config.targetBytesPerBrokerMB * 1000 * 1000/ config.throttle}")
     }
 
-    private def waitForOffsetsToMatch(offset: Int, partitionId: Int, broker: KafkaServer, topic: String): Boolean = waitUntilTrue(() => {
-      offset == broker.getLogManager.getLog(new TopicPartition(topic, partitionId))
-        .map(_.logEndOffset).getOrElse(0)
-    }, s"Offsets did not match for partition $partitionId on broker ${broker.config.brokerId}", 60000)
-
     def waitForReassignmentToComplete() {
       waitUntilTrue(() => {
         printRateMetrics()

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 6a68e52..d3f9573 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -22,184 +22,146 @@ import java.util.concurrent.TimeUnit
 import java.util.Collections
 import java.util.Properties
 
-import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
-import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
-import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
-import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
+import org.junit.{After, Before, Test}
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
 import kafka.consumer.OldConsumer
 import kafka.consumer.Whitelist
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, WakeupException}
+import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
 
+import scala.collection.mutable.ArrayBuffer
 
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
+  private val topic = "foo"
+  private val group = "test.group"
 
-  val overridingProps = new Properties()
-  val topic = "foo"
-  val topicFilter = Whitelist(topic)
-  val group = "test.group"
-  val props = new Properties
+  @deprecated("This field will be removed in a future release", "0.11.0.0")
+  private val oldConsumers = new ArrayBuffer[OldConsumer]
+  private var consumerGroupService: ConsumerGroupService = _
+  private var consumerGroupExecutor: ConsumerGroupExecutor = _
 
   // configure the servers and clients
-  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+  override def generateConfigs() = {
+    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+      KafkaConfig.fromProps(props)
+    }
+  }
 
   @Before
   override def setUp() {
     super.setUp()
-
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
-    props.setProperty("group.id", group)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (consumerGroupService != null)
+      consumerGroupService.close()
+    if (consumerGroupExecutor != null)
+      consumerGroupExecutor.shutdown()
+    oldConsumers.foreach(_.stop())
+    super.tearDown()
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeNonExistingGroup() {
-    // mocks
-    props.setProperty("zookeeper.connect", zkConnect)
-    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
-    // stubs
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    createOldConsumer()
     val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
-    val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
-    // simulation
-    EasyMock.replay(consumerMock)
-
-    // action/test
-    TestUtils.waitUntilTrue(() => consumerGroupCommand.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
-
-    // cleanup
-    consumerGroupCommand.close()
-    consumerMock.stop()
+    consumerGroupService = new ZkConsumerGroupService(opts)
+    TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeExistingGroup() {
-    // mocks
-    props.setProperty("zookeeper.connect", zkConnect)
-    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
-    // stubs
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    createOldConsumer()
     val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
-    // simulation
-    EasyMock.replay(consumerMock)
-
-    // action/test
+    consumerGroupService = new ZkConsumerGroupService(opts)
     TestUtils.waitUntilTrue(() => {
-        val (_, assignments) = consumerGroupCommand.describeGroup()
-        assignments.isDefined &&
-        assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected rows and a consumer id column in describe group results.")
-
-    // cleanup
-    consumerGroupCommand.close()
-    consumerMock.stop()
+      val (_, assignments) = consumerGroupService.describeGroup()
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 1 &&
+      assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    }, "Expected rows and a consumer id column in describe group results.")
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeExistingGroupWithNoMembers() {
-    // mocks
-    props.setProperty("zookeeper.connect", zkConnect)
-    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
-    // stubs
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    createOldConsumer()
     val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
-    // simulation
-    EasyMock.replay(consumerMock)
+    consumerGroupService = new ZkConsumerGroupService(opts)
 
-    // action/test
     TestUtils.waitUntilTrue(() => {
-        val (_, assignments) = consumerGroupCommand.describeGroup()
-        assignments.isDefined &&
-        assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected rows and a consumer id column in describe group results.")
-    consumerMock.stop()
+      val (_, assignments) = consumerGroupService.describeGroup()
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 1 &&
+      assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    }, "Expected rows and a consumer id column in describe group results.")
+    oldConsumers.head.stop()
 
     TestUtils.waitUntilTrue(() => {
-        val (_, assignments) = consumerGroupCommand.describeGroup()
-        assignments.isDefined &&
-        assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
-      }, "Expected no active member in describe group results.")
-
-    // cleanup
-    consumerGroupCommand.close()
+      val (_, assignments) = consumerGroupService.describeGroup()
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 1 &&
+      assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
+    }, "Expected no active member in describe group results.")
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeConsumersWithNoAssignedPartitions() {
-    // mocks
-    props.setProperty("zookeeper.connect", zkConnect)
-    val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-    val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
-    // stubs
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    createOldConsumer()
+    createOldConsumer()
     val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
-    val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
-    EasyMock.replay(consumer1Mock)
-    EasyMock.replay(consumer2Mock)
-
-    // action/test
+    consumerGroupService = new ZkConsumerGroupService(opts)
     TestUtils.waitUntilTrue(() => {
-        val (_, assignments) = consumerGroupCommand.describeGroup()
-        assignments.isDefined &&
-        assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
-        assignments.get.count { x => x.group == group && !x.partition.isDefined } == 1
-      }, "Expected rows for consumers with no assigned partitions in describe group results.")
-
-    // cleanup
-    consumerGroupCommand.close()
-    consumer1Mock.stop()
-    consumer2Mock.stop()
+      val (_, assignments) = consumerGroupService.describeGroup()
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 2 &&
+      assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
+      assignments.get.count { x => x.group == group && x.partition.isEmpty } == 1
+    }, "Expected rows for consumers with no assigned partitions in describe group results.")
   }
 
   @Test
   def testDescribeNonExistingGroupWithNewConsumer() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
 
     // note the group to be queried is a different (non-existing) group
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
-    val (state, assignments) = consumerGroupCommand.describeGroup()
+    val (state, assignments) = consumerGroupService.describeGroup()
     assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
-    consumerGroupCommand.close()
-    executor.shutdown()
   }
 
   @Test
   def testDescribeExistingGroupWithNewConsumer() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
+        val (state, assignments) = consumerGroupService.describeGroup()
         state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
@@ -207,112 +169,110 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
     }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
-
-    consumerGroupCommand.close()
-    executor.shutdown()
   }
 
   @Test
   def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     // run one consumer in the group consuming from a single-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, _) = consumerGroupCommand.describeGroup()
+      val (state, _) = consumerGroupService.describeGroup()
       state == Some("Stable")
     }, "Expected the group to initially become stable.")
 
     // stop the consumer so the group has no active member anymore
-    executor.shutdown()
-
-    TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
-        state == Some("Empty") &&
-        assignments.isDefined &&
-        assignments.get.count(_.group == group) == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
-        assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-        assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-    }, "Expected no active member in describe group results.")
-
-    consumerGroupCommand.close()
+    consumerGroupExecutor.shutdown()
+
+    val (result, succeeded) = TestUtils.computeUntilTrue(consumerGroupService.describeGroup()) { case (state, assignments) =>
+      val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
+      def assignment = testGroupAssignments.head
+      state == Some("Empty") &&
+        testGroupAssignments.size == 1 &&
+        assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+        assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+        assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    }
+    val (state, assignments) = result
+    assertTrue(s"Expected no active member in describe group results, state: $state, assignments: $assignments",
+      succeeded)
   }
 
   @Test
   def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     // run two consumers in the group consuming from a single-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupCommand.describeGroup()
-        state == Some("Stable") &&
+      val (state, assignments) = consumerGroupService.describeGroup()
+      state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
-        assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
-    }, "Expected rows for consumers with no assigned partitions in describe group results.")
-
-    consumerGroupCommand.close()
-    executor.shutdown()
+        assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
+        assignments.get.count { x => x.group == group && x.partition.isEmpty } == 1
+    }, "Expected rows for consumers with no assigned partitions in describe group results")
   }
 
   @Test
   def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     val topic2 = "foo2"
     AdminUtils.createTopic(zkUtils, topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = consumerGroupCommand.describeGroup()
+      val (state, assignments) = consumerGroupService.describeGroup()
       state == Some("Stable") &&
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 2 &&
       assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
-      assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
+      assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
     }, "Expected two rows (one row per consumer) in describe group results.")
-
-    consumerGroupCommand.close()
-    executor.shutdown()
   }
 
   @Test
   def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialisation to ensure that initialization doesn't
+    // complete before the timeout expires
+
     // run one consumer in the group consuming from a single-partition topic
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
 
     // set the group initialization timeout too low for the group to stabilize
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "10")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    consumerGroupService = new KafkaConsumerGroupService(opts)
 
     try {
-      consumerGroupCommand.describeGroup()
+      consumerGroupService.describeGroup()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case _: TimeoutException =>
-        // OK
-      case e: Throwable =>
-        fail("An unexpected exception occurred: " + e.getMessage)
-        throw e
-    } finally {
-      consumerGroupCommand.close()
-      executor.shutdown()
+      case _: TimeoutException => // OK
     }
   }
+
+  private def createOldConsumer(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.setProperty("group.id", group)
+    consumerProps.setProperty("zookeeper.connect", zkConnect)
+    oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
+  }
 }
 
 
@@ -330,7 +290,7 @@ class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) ex
       while (true)
         consumer.poll(Long.MaxValue)
     } catch {
-      case e: WakeupException => // OK
+      case _: WakeupException => // OK
     } finally {
       consumer.close()
     }
@@ -344,24 +304,18 @@ class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) ex
 
 class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
   val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
-  var consumers = List[ConsumerThread]()
+  private val consumers = new ArrayBuffer[ConsumerThread]()
   for (i <- 1 to numConsumers) {
     val consumer = new ConsumerThread(broker, i, groupId, topic)
-    consumers ++= List(consumer)
+    consumers += consumer
     executor.submit(consumer)
   }
 
-  Runtime.getRuntime().addShutdownHook(new Thread() {
-    override def run() {
-      shutdown()
-    }
-  })
-
   def shutdown() {
     consumers.foreach(_.shutdown)
     executor.shutdown()
     try {
-      executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+      executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
     } catch {
       case e: InterruptedException =>
         e.printStackTrace()

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index 9608892..bdabd67 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -20,7 +20,7 @@ import scala.collection.Seq
 
 object ReplicationQuotaUtils {
 
-  def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Boolean = {
+  def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Unit = {
     TestUtils.waitUntilTrue(() => {
       val hasRateProp = servers.forall { server =>
         val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker, server.config.brokerId.toString)
@@ -34,7 +34,7 @@ object ReplicationQuotaUtils {
     }, "Throttle limit/replicas was not unset")
   }
 
-  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: String, throttledFollowers: String): Boolean = {
+  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: String, throttledFollowers: String): Unit = {
     TestUtils.waitUntilTrue(() => {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 5fc4c0f..e7d2a64 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -208,13 +208,13 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
       throttledTook < expectedDuration * 1000 * 1.5)
   }
 
-  def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
+  def addData(msgCount: Int, msg: Array[Byte]): Unit = {
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
     (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
     waitForOffsetsToMatch(msgCount, 0, 100)
   }
 
-  private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean = {
+  private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Unit = {
     waitUntilTrue(() => {
       offset == brokerFor(brokerId).getLogManager.getLog(new TopicPartition(topic, partitionId))
         .map(_.logEndOffset).getOrElse(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 182e904..93505c2 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -368,7 +368,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
       .records.batches().asScala.toSeq.last
   }
 
-  private def awaitISR(tp: TopicPartition): Boolean = {
+  private def awaitISR(tp: TopicPartition): Unit = {
     TestUtils.waitUntilTrue(() => {
       leader.replicaManager.getReplicaOrException(tp).partition.inSyncReplicas.map(_.brokerId).size == 2
     }, "")

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index f7110ee..5e069a1 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -219,7 +219,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher")
   }
 
-  private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Boolean = {
+  private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {
     TestUtils.waitUntilTrue(() => {
       brokers(0).metadataCache.getPartitionInfo(topic, partition) match {
         case Some(m) => m.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == epoch

http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 14edd35..a2c9b05 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,7 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.nio.charset.{Charset, StandardCharsets}
 import java.security.cert.X509Certificate
-import java.util.{ArrayList, Collections, Properties}
+import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
 import javax.net.ssl.X509TrustManager
 
@@ -838,11 +838,11 @@ object TestUtils extends Logging {
    * Wait until the given condition is true or throw an exception if the given wait time elapses.
    */
   def waitUntilTrue(condition: () => Boolean, msg: => String,
-                    waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
+                    waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Unit = {
     val startTime = System.currentTimeMillis()
     while (true) {
       if (condition())
-        return true
+        return
       if (System.currentTimeMillis() > startTime + waitTime)
         fail(msg)
       Thread.sleep(waitTime.min(pause))
@@ -851,6 +851,28 @@ object TestUtils extends Logging {
     throw new RuntimeException("unexpected error")
   }
 
+  /**
+    * Invoke `compute` until `predicate` is true or `waitTime` elapses.
+    *
+    * Return the last `compute` result and a boolean indicating whether `predicate` succeeded for that value.
+    *
+    * This method is useful in cases where `waitUntilTrue` makes it awkward to provide good error messages.
+    */
+  def computeUntilTrue[T](compute: => T, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(
+                    predicate: T => Boolean): (T, Boolean) = {
+    val startTime = System.currentTimeMillis()
+    while (true) {
+      val result = compute
+      if (predicate(result))
+        return result -> true
+      if (System.currentTimeMillis() > startTime + waitTime)
+        return result -> false
+      Thread.sleep(waitTime.min(pause))
+    }
+    // should never hit here
+    throw new RuntimeException("unexpected error")
+  }
+
   def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
     server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
   }