You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 12:32:31 UTC
kafka git commit: MINOR: Create offsets topic explicitly in
DescribeConsumerGroupTest
Repository: kafka
Updated Branches:
refs/heads/trunk 17b2bde4b -> 313f8d7dd
MINOR: Create offsets topic explicitly in DescribeConsumerGroupTest
This should fix transient failures due to timeouts caused by slow
auto creation of the offsets topic. The one exception is
`testDescribeGroupWithNewConsumerWithShortInitializationTimeout` where
we want initialisation to take longer than the timeout so we let
it be auto created. That test has also been a bit flaky and I reduced
the timeout to 1 ms.
Also:
- Simplified resource handling
- Removed usage of EasyMock that didn't make sense.
- `findCoordinator` should retry if `sendAnyNode` throws an exception
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #3237 from ijuma/kafka-49480-describe-consumer-group-test-failure
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/313f8d7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/313f8d7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/313f8d7d
Branch: refs/heads/trunk
Commit: 313f8d7ddc48943d9f2290a27751d7a0fa1d3a82
Parents: 17b2bde
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jun 6 13:31:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 13:31:46 2017 +0100
----------------------------------------------------------------------
.../main/scala/kafka/admin/AdminClient.scala | 36 ++-
.../other/kafka/ReplicationQuotasTestRig.scala | 5 -
.../kafka/admin/DescribeConsumerGroupTest.scala | 284 ++++++++-----------
.../kafka/admin/ReplicationQuotaUtils.scala | 4 +-
.../kafka/server/ReplicationQuotasTest.scala | 4 +-
...rivenReplicationProtocolAcceptanceTest.scala | 2 +-
.../epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 28 +-
8 files changed, 174 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4410e94..7bd626a 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,14 +16,14 @@ import java.io.IOException
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
+import java.util.concurrent.{ConcurrentLinkedQueue, Future, TimeUnit}
+
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging
-
import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{RequestFutureAdapter, ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, RequestFutureAdapter}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.TimeoutException
@@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -104,20 +104,32 @@ class AdminClient(val time: Time,
}
def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
+ val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+
+ def sendRequest: Try[FindCoordinatorResponse] =
+ Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])
+
val startTime = time.milliseconds
- val requestBuilder = new FindCoordinatorRequest.Builder(org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
- var response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
+ var response = sendRequest
+
+ while ((response.isFailure || response.get.error == Errors.COORDINATOR_NOT_AVAILABLE) &&
+ (time.milliseconds - startTime < timeoutMs)) {
- while (response.error == Errors.COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
Thread.sleep(retryBackoffMs)
- response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
+ response = sendRequest
}
- if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
- throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
+ def timeoutException(cause: Throwable) =
+ throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", cause)
- response.error.maybeThrow()
- response.node
+ response match {
+ case Failure(exception) => throw timeoutException(exception)
+ case Success(response) =>
+ if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
+ throw timeoutException(response.error.exception)
+ response.error.maybeThrow()
+ response.node
+ }
}
def listGroups(node: Node): List[GroupOverview] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index d8bc65e..8a26cac 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -187,11 +187,6 @@ object ReplicationQuotasTestRig {
println(s"Worst case duration is ${config.targetBytesPerBrokerMB * 1000 * 1000/ config.throttle}")
}
- private def waitForOffsetsToMatch(offset: Int, partitionId: Int, broker: KafkaServer, topic: String): Boolean = waitUntilTrue(() => {
- offset == broker.getLogManager.getLog(new TopicPartition(topic, partitionId))
- .map(_.logEndOffset).getOrElse(0)
- }, s"Offsets did not match for partition $partitionId on broker ${broker.config.brokerId}", 60000)
-
def waitForReassignmentToComplete() {
waitUntilTrue(() => {
printRateMetrics()
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 6a68e52..d3f9573 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -22,184 +22,146 @@ import java.util.concurrent.TimeUnit
import java.util.Collections
import java.util.Properties
-import org.easymock.EasyMock
import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
-import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
-import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
-import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
+import org.junit.{After, Before, Test}
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
import kafka.consumer.OldConsumer
import kafka.consumer.Whitelist
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, WakeupException}
+import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
+import scala.collection.mutable.ArrayBuffer
class DescribeConsumerGroupTest extends KafkaServerTestHarness {
+ private val topic = "foo"
+ private val group = "test.group"
- val overridingProps = new Properties()
- val topic = "foo"
- val topicFilter = Whitelist(topic)
- val group = "test.group"
- val props = new Properties
+ @deprecated("This field will be removed in a future release", "0.11.0.0")
+ private val oldConsumers = new ArrayBuffer[OldConsumer]
+ private var consumerGroupService: ConsumerGroupService = _
+ private var consumerGroupExecutor: ConsumerGroupExecutor = _
// configure the servers and clients
- override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+ override def generateConfigs() = {
+ TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+ KafkaConfig.fromProps(props)
+ }
+ }
@Before
override def setUp() {
super.setUp()
-
AdminUtils.createTopic(zkUtils, topic, 1, 1)
- props.setProperty("group.id", group)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (consumerGroupService != null)
+ consumerGroupService.close()
+ if (consumerGroupExecutor != null)
+ consumerGroupExecutor.shutdown()
+ oldConsumers.foreach(_.stop())
+ super.tearDown()
}
@Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeNonExistingGroup() {
- // mocks
- props.setProperty("zookeeper.connect", zkConnect)
- val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
- // stubs
+ TestUtils.createOffsetsTopic(zkUtils, servers)
+ createOldConsumer()
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group"))
- val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
- // simulation
- EasyMock.replay(consumerMock)
-
- // action/test
- TestUtils.waitUntilTrue(() => consumerGroupCommand.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
-
- // cleanup
- consumerGroupCommand.close()
- consumerMock.stop()
+ consumerGroupService = new ZkConsumerGroupService(opts)
+ TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.")
}
@Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeExistingGroup() {
- // mocks
- props.setProperty("zookeeper.connect", zkConnect)
- val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
- // stubs
+ TestUtils.createOffsetsTopic(zkUtils, servers)
+ createOldConsumer()
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
- // simulation
- EasyMock.replay(consumerMock)
-
- // action/test
+ consumerGroupService = new ZkConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (_, assignments) = consumerGroupCommand.describeGroup()
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
- }, "Expected rows and a consumer id column in describe group results.")
-
- // cleanup
- consumerGroupCommand.close()
- consumerMock.stop()
+ val (_, assignments) = consumerGroupService.describeGroup()
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+ }, "Expected rows and a consumer id column in describe group results.")
}
@Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeExistingGroupWithNoMembers() {
- // mocks
- props.setProperty("zookeeper.connect", zkConnect)
- val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
- // stubs
+ TestUtils.createOffsetsTopic(zkUtils, servers)
+ createOldConsumer()
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
- // simulation
- EasyMock.replay(consumerMock)
+ consumerGroupService = new ZkConsumerGroupService(opts)
- // action/test
TestUtils.waitUntilTrue(() => {
- val (_, assignments) = consumerGroupCommand.describeGroup()
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
- }, "Expected rows and a consumer id column in describe group results.")
- consumerMock.stop()
+ val (_, assignments) = consumerGroupService.describeGroup()
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+ }, "Expected rows and a consumer id column in describe group results.")
+ oldConsumers.head.stop()
TestUtils.waitUntilTrue(() => {
- val (_, assignments) = consumerGroupCommand.describeGroup()
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
- }, "Expected no active member in describe group results.")
-
- // cleanup
- consumerGroupCommand.close()
+ val (_, assignments) = consumerGroupService.describeGroup()
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
+ }, "Expected no active member in describe group results.")
}
@Test
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
def testDescribeConsumersWithNoAssignedPartitions() {
- // mocks
- props.setProperty("zookeeper.connect", zkConnect)
- val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
- val consumer2Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
-
- // stubs
+ TestUtils.createOffsetsTopic(zkUtils, servers)
+ createOldConsumer()
+ createOldConsumer()
val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
- val consumerGroupCommand = new ZkConsumerGroupService(opts)
-
- EasyMock.replay(consumer1Mock)
- EasyMock.replay(consumer2Mock)
-
- // action/test
+ consumerGroupService = new ZkConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (_, assignments) = consumerGroupCommand.describeGroup()
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 2 &&
- assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
- assignments.get.count { x => x.group == group && !x.partition.isDefined } == 1
- }, "Expected rows for consumers with no assigned partitions in describe group results.")
-
- // cleanup
- consumerGroupCommand.close()
- consumer1Mock.stop()
- consumer2Mock.stop()
+ val (_, assignments) = consumerGroupService.describeGroup()
+ assignments.isDefined &&
+ assignments.get.count(_.group == group) == 2 &&
+ assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
+ assignments.get.count { x => x.group == group && x.partition.isEmpty } == 1
+ }, "Expected rows for consumers with no assigned partitions in describe group results.")
}
@Test
def testDescribeNonExistingGroupWithNewConsumer() {
+ TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
// note the group to be queried is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (state, assignments) = consumerGroupService.describeGroup()
assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
- consumerGroupCommand.close()
- executor.shutdown()
}
@Test
def testDescribeExistingGroupWithNewConsumer() {
+ TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (state, assignments) = consumerGroupService.describeGroup()
state == Some("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 1 &&
@@ -207,112 +169,110 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
}, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
-
- consumerGroupCommand.close()
- executor.shutdown()
}
@Test
def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
+ TestUtils.createOffsetsTopic(zkUtils, servers)
// run one consumer in the group consuming from a single-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (state, _) = consumerGroupCommand.describeGroup()
+ val (state, _) = consumerGroupService.describeGroup()
state == Some("Stable")
}, "Expected the group to initially become stable.")
// stop the consumer so the group has no active member anymore
- executor.shutdown()
-
- TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- state == Some("Empty") &&
- assignments.isDefined &&
- assignments.get.count(_.group == group) == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
- assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
- assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
- }, "Expected no active member in describe group results.")
-
- consumerGroupCommand.close()
+ consumerGroupExecutor.shutdown()
+
+ val (result, succeeded) = TestUtils.computeUntilTrue(consumerGroupService.describeGroup()) { case (state, assignments) =>
+ val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == group))
+ def assignment = testGroupAssignments.head
+ state == Some("Empty") &&
+ testGroupAssignments.size == 1 &&
+ assignment.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+ assignment.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+ assignment.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+ }
+ val (state, assignments) = result
+ assertTrue(s"Expected no active member in describe group results, state: $state, assignments: $assignments",
+ succeeded)
}
@Test
def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
+ TestUtils.createOffsetsTopic(zkUtils, servers)
// run two consumers in the group consuming from a single-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
- state == Some("Stable") &&
+ val (state, assignments) = consumerGroupService.describeGroup()
+ state == Some("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
- assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
- assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
- }, "Expected rows for consumers with no assigned partitions in describe group results.")
-
- consumerGroupCommand.close()
- executor.shutdown()
+ assignments.get.count { x => x.group == group && x.partition.isDefined } == 1 &&
+ assignments.get.count { x => x.group == group && x.partition.isEmpty } == 1
+ }, "Expected rows for consumers with no assigned partitions in describe group results")
}
@Test
def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
+ TestUtils.createOffsetsTopic(zkUtils, servers)
val topic2 = "foo2"
AdminUtils.createTopic(zkUtils, topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group)
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
TestUtils.waitUntilTrue(() => {
- val (state, assignments) = consumerGroupCommand.describeGroup()
+ val (state, assignments) = consumerGroupService.describeGroup()
state == Some("Stable") &&
assignments.isDefined &&
assignments.get.count(_.group == group) == 2 &&
assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
- assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
+ assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
}, "Expected two rows (one row per consumer) in describe group results.")
-
- consumerGroupCommand.close()
- executor.shutdown()
}
@Test
def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
+ // Let creation of the offsets topic happen during group initialisation to ensure that initialization doesn't
+ // complete before the timeout expires
+
// run one consumer in the group consuming from a single-partition topic
- val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
+ consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic)
// set the group initialization timeout too low for the group to stabilize
- val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "10")
+ val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1")
val opts = new ConsumerGroupCommandOptions(cgcArgs)
- val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+ consumerGroupService = new KafkaConsumerGroupService(opts)
try {
- consumerGroupCommand.describeGroup()
+ consumerGroupService.describeGroup()
fail("The consumer group command should fail due to low initialization timeout")
} catch {
- case _: TimeoutException =>
- // OK
- case e: Throwable =>
- fail("An unexpected exception occurred: " + e.getMessage)
- throw e
- } finally {
- consumerGroupCommand.close()
- executor.shutdown()
+ case _: TimeoutException => // OK
}
}
+
+ private def createOldConsumer(): Unit = {
+ val consumerProps = new Properties
+ consumerProps.setProperty("group.id", group)
+ consumerProps.setProperty("zookeeper.connect", zkConnect)
+ oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
+ }
}
@@ -330,7 +290,7 @@ class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) ex
while (true)
consumer.poll(Long.MaxValue)
} catch {
- case e: WakeupException => // OK
+ case _: WakeupException => // OK
} finally {
consumer.close()
}
@@ -344,24 +304,18 @@ class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) ex
class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) {
val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
- var consumers = List[ConsumerThread]()
+ private val consumers = new ArrayBuffer[ConsumerThread]()
for (i <- 1 to numConsumers) {
val consumer = new ConsumerThread(broker, i, groupId, topic)
- consumers ++= List(consumer)
+ consumers += consumer
executor.submit(consumer)
}
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() {
- shutdown()
- }
- })
-
def shutdown() {
consumers.foreach(_.shutdown)
executor.shutdown()
try {
- executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ executor.awaitTermination(5000, TimeUnit.MILLISECONDS)
} catch {
case e: InterruptedException =>
e.printStackTrace()
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index 9608892..bdabd67 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -20,7 +20,7 @@ import scala.collection.Seq
object ReplicationQuotaUtils {
- def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Boolean = {
+ def checkThrottleConfigRemovedFromZK(topic: String, servers: Seq[KafkaServer]): Unit = {
TestUtils.waitUntilTrue(() => {
val hasRateProp = servers.forall { server =>
val brokerConfig = AdminUtils.fetchEntityConfig(server.zkUtils, ConfigType.Broker, server.config.brokerId.toString)
@@ -34,7 +34,7 @@ object ReplicationQuotaUtils {
}, "Throttle limit/replicas was not unset")
}
- def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: String, throttledFollowers: String): Boolean = {
+ def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: String, throttledFollowers: String): Unit = {
TestUtils.waitUntilTrue(() => {
//Check for limit in ZK
val brokerConfigAvailable = servers.forall { server =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 5fc4c0f..e7d2a64 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -208,13 +208,13 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
throttledTook < expectedDuration * 1000 * 1.5)
}
- def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
+ def addData(msgCount: Int, msg: Array[Byte]): Unit = {
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
(0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
waitForOffsetsToMatch(msgCount, 0, 100)
}
- private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean = {
+ private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Unit = {
waitUntilTrue(() => {
offset == brokerFor(brokerId).getLogManager.getLog(new TopicPartition(topic, partitionId))
.map(_.logEndOffset).getOrElse(0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 182e904..93505c2 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -368,7 +368,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
.records.batches().asScala.toSeq.last
}
- private def awaitISR(tp: TopicPartition): Boolean = {
+ private def awaitISR(tp: TopicPartition): Unit = {
TestUtils.waitUntilTrue(() => {
leader.replicaManager.getReplicaOrException(tp).partition.inSyncReplicas.map(_.brokerId).size == 2
}, "")
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index f7110ee..5e069a1 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -219,7 +219,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher")
}
- private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Boolean = {
+ private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {
TestUtils.waitUntilTrue(() => {
brokers(0).metadataCache.getPartitionInfo(topic, partition) match {
case Some(m) => m.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == epoch
http://git-wip-us.apache.org/repos/asf/kafka/blob/313f8d7d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 14edd35..a2c9b05 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,7 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.charset.{Charset, StandardCharsets}
import java.security.cert.X509Certificate
-import java.util.{ArrayList, Collections, Properties}
+import java.util.{Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
@@ -838,11 +838,11 @@ object TestUtils extends Logging {
* Wait until the given condition is true or throw an exception if the given wait time elapses.
*/
def waitUntilTrue(condition: () => Boolean, msg: => String,
- waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Boolean = {
+ waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Unit = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
- return true
+ return
if (System.currentTimeMillis() > startTime + waitTime)
fail(msg)
Thread.sleep(waitTime.min(pause))
@@ -851,6 +851,28 @@ object TestUtils extends Logging {
throw new RuntimeException("unexpected error")
}
+ /**
+ * Invoke `compute` until `predicate` is true or `waitTime` elapses.
+ *
+ * Return the last `compute` result and a boolean indicating whether `predicate` succeeded for that value.
+ *
+ * This method is useful in cases where `waitUntilTrue` makes it awkward to provide good error messages.
+ */
+ def computeUntilTrue[T](compute: => T, waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(
+ predicate: T => Boolean): (T, Boolean) = {
+ val startTime = System.currentTimeMillis()
+ while (true) {
+ val result = compute
+ if (predicate(result))
+ return result -> true
+ if (System.currentTimeMillis() > startTime + waitTime)
+ return result -> false
+ Thread.sleep(waitTime.min(pause))
+ }
+ // should never hit here
+ throw new RuntimeException("unexpected error")
+ }
+
def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
}