You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/14 08:22:21 UTC
[kafka] branch trunk updated: MINOR: Remove thread leak from ConsumerBounceTest (#13956)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4960a5ebe90 MINOR: Remove thread leak from ConsumerBounceTest (#13956)
4960a5ebe90 is described below
commit 4960a5ebe90c44b8d0616d328e022dac5067395b
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri Jul 14 10:22:14 2023 +0200
MINOR: Remove thread leak from ConsumerBounceTest (#13956)
This commit prevents the leak of daemon-bounce-broker thread which was causing test failures for tests which check for thread leak prior to running.
Reviewers: Luke Chen <sh...@gmail.com>, Justine Olshan <jo...@confluent.io>, Philip Nee <ph...@gmail.com>
---
.../integration/kafka/api/ConsumerBounceTest.scala | 105 ++++++++++++---------
1 file changed, 59 insertions(+), 46 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index e0e363ef4a6..26cf0ea6746 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -95,27 +95,30 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
consumer.subscribe(Collections.singletonList(topic))
val scheduler = new BounceBrokerScheduler(numIters)
- scheduler.start()
+ try {
+ scheduler.start()
- while (scheduler.isRunning) {
- val records = consumer.poll(Duration.ofMillis(100)).asScala
+ while (scheduler.isRunning) {
+ val records = consumer.poll(Duration.ofMillis(100)).asScala
- for (record <- records) {
- assertEquals(consumed, record.offset())
- consumed += 1
- }
+ for (record <- records) {
+ assertEquals(consumed, record.offset())
+ consumed += 1
+ }
- if (records.nonEmpty) {
- consumer.commitSync()
- assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+ if (records.nonEmpty) {
+ consumer.commitSync()
+ assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
- if (consumer.position(tp) == numRecords) {
- consumer.seekToBeginning(Collections.emptyList())
- consumed = 0
+ if (consumer.position(tp) == numRecords) {
+ consumer.seekToBeginning(Collections.emptyList())
+ consumed = 0
+ }
}
}
+ } finally {
+ scheduler.shutdown()
}
- scheduler.shutdown()
}
@Test
@@ -136,24 +139,28 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
), "Failed to update high watermark for followers after timeout")
val scheduler = new BounceBrokerScheduler(numIters)
- scheduler.start()
-
- while(scheduler.isRunning) {
- val coin = TestUtils.random.nextInt(3)
- if (coin == 0) {
- info("Seeking to end of log")
- consumer.seekToEnd(Collections.emptyList())
- assertEquals(numRecords.toLong, consumer.position(tp))
- } else if (coin == 1) {
- val pos = TestUtils.random.nextInt(numRecords).toLong
- info("Seeking to " + pos)
- consumer.seek(tp, pos)
- assertEquals(pos, consumer.position(tp))
- } else if (coin == 2) {
- info("Committing offset.")
- consumer.commitSync()
- assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+ try {
+ scheduler.start()
+
+ while (scheduler.isRunning) {
+ val coin = TestUtils.random.nextInt(3)
+ if (coin == 0) {
+ info("Seeking to end of log")
+ consumer.seekToEnd(Collections.emptyList())
+ assertEquals(numRecords.toLong, consumer.position(tp))
+ } else if (coin == 1) {
+ val pos = TestUtils.random.nextInt(numRecords).toLong
+ info("Seeking to " + pos)
+ consumer.seek(tp, pos)
+ assertEquals(pos, consumer.position(tp))
+ } else if (coin == 2) {
+ info("Committing offset.")
+ consumer.commitSync()
+ assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+ }
}
+ } finally {
+ scheduler.shutdown()
}
}
@@ -344,21 +351,26 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
// Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group
- addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](),
- consumerPollers, List[String](topic), partitions, group)
- val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
- mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
- val rejectedConsumer = rejectedConsumerPollers.head
- TestUtils.waitUntilTrue(() => {
- rejectedConsumer.thrownException.isDefined
- }, "Extra consumer did not throw an exception")
- assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
-
- // assert group continues to live
- producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
- TestUtils.waitUntilTrue(() => {
- consumerPollers.forall(p => p.receivedMessages >= 100)
- }, "The consumers in the group could not fetch the expected records", 10000L)
+ val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]()
+ try {
+ addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](),
+ consumerPollers, List[String](topic), partitions, group)
+ val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
+ mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
+ val rejectedConsumer = rejectedConsumerPollers.head
+ TestUtils.waitUntilTrue(() => {
+ rejectedConsumer.thrownException.isDefined
+ }, "Extra consumer did not throw an exception")
+ assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
+
+ // assert group continues to live
+ producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
+ TestUtils.waitUntilTrue(() => {
+ consumerPollers.forall(p => p.receivedMessages >= 100)
+ }, "The consumers in the group could not fetch the expected records", 10000L)
+ } finally {
+ consumerPollers.foreach(_.shutdown())
+ }
}
/**
@@ -437,6 +449,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
else
subscribeConsumerAndStartPolling(consumer, List(topic))
+ consumerPollers += consumerPoller
receiveExactRecords(consumerPoller, numRecords)
consumerPoller.shutdown()
consumer