You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/14 08:22:21 UTC

[kafka] branch trunk updated: MINOR: Remove thread leak from ConsumerBounceTest (#13956)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4960a5ebe90 MINOR: Remove thread leak from ConsumerBounceTest (#13956)
4960a5ebe90 is described below

commit 4960a5ebe90c44b8d0616d328e022dac5067395b
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Fri Jul 14 10:22:14 2023 +0200

    MINOR: Remove thread leak from ConsumerBounceTest (#13956)
    
    This commit prevents the leak of daemon-bounce-broker thread which was causing test failures for tests which check for thread leak prior to running.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Justine Olshan <jo...@confluent.io>, Philip Nee <ph...@gmail.com>
---
 .../integration/kafka/api/ConsumerBounceTest.scala | 105 ++++++++++++---------
 1 file changed, 59 insertions(+), 46 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index e0e363ef4a6..26cf0ea6746 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -95,27 +95,30 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     consumer.subscribe(Collections.singletonList(topic))
 
     val scheduler = new BounceBrokerScheduler(numIters)
-    scheduler.start()
+    try {
+      scheduler.start()
 
-    while (scheduler.isRunning) {
-      val records = consumer.poll(Duration.ofMillis(100)).asScala
+      while (scheduler.isRunning) {
+        val records = consumer.poll(Duration.ofMillis(100)).asScala
 
-      for (record <- records) {
-        assertEquals(consumed, record.offset())
-        consumed += 1
-      }
+        for (record <- records) {
+          assertEquals(consumed, record.offset())
+          consumed += 1
+        }
 
-      if (records.nonEmpty) {
-        consumer.commitSync()
-        assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+        if (records.nonEmpty) {
+          consumer.commitSync()
+          assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
 
-        if (consumer.position(tp) == numRecords) {
-          consumer.seekToBeginning(Collections.emptyList())
-          consumed = 0
+          if (consumer.position(tp) == numRecords) {
+            consumer.seekToBeginning(Collections.emptyList())
+            consumed = 0
+          }
         }
       }
+    } finally {
+      scheduler.shutdown()
     }
-    scheduler.shutdown()
   }
 
   @Test
@@ -136,24 +139,28 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     ), "Failed to update high watermark for followers after timeout")
 
     val scheduler = new BounceBrokerScheduler(numIters)
-    scheduler.start()
-
-    while(scheduler.isRunning) {
-      val coin = TestUtils.random.nextInt(3)
-      if (coin == 0) {
-        info("Seeking to end of log")
-        consumer.seekToEnd(Collections.emptyList())
-        assertEquals(numRecords.toLong, consumer.position(tp))
-      } else if (coin == 1) {
-        val pos = TestUtils.random.nextInt(numRecords).toLong
-        info("Seeking to " + pos)
-        consumer.seek(tp, pos)
-        assertEquals(pos, consumer.position(tp))
-      } else if (coin == 2) {
-        info("Committing offset.")
-        consumer.commitSync()
-        assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+    try {
+      scheduler.start()
+
+      while (scheduler.isRunning) {
+        val coin = TestUtils.random.nextInt(3)
+        if (coin == 0) {
+          info("Seeking to end of log")
+          consumer.seekToEnd(Collections.emptyList())
+          assertEquals(numRecords.toLong, consumer.position(tp))
+        } else if (coin == 1) {
+          val pos = TestUtils.random.nextInt(numRecords).toLong
+          info("Seeking to " + pos)
+          consumer.seek(tp, pos)
+          assertEquals(pos, consumer.position(tp))
+        } else if (coin == 2) {
+          info("Committing offset.")
+          consumer.commitSync()
+          assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
+        }
       }
+    } finally {
+      scheduler.shutdown()
     }
   }
 
@@ -344,21 +351,26 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
 
     // Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group
-    addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](),
-      consumerPollers, List[String](topic), partitions, group)
-    val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
-      mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
-    val rejectedConsumer = rejectedConsumerPollers.head
-    TestUtils.waitUntilTrue(() => {
-      rejectedConsumer.thrownException.isDefined
-    }, "Extra consumer did not throw an exception")
-    assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
-
-    // assert group continues to live
-    producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
-    TestUtils.waitUntilTrue(() => {
-      consumerPollers.forall(p => p.receivedMessages >= 100)
-    }, "The consumers in the group could not fetch the expected records", 10000L)
+    val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]()
+    try {
+      addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](),
+        consumerPollers, List[String](topic), partitions, group)
+      val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
+        mutable.Buffer[Consumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
+      val rejectedConsumer = rejectedConsumerPollers.head
+      TestUtils.waitUntilTrue(() => {
+        rejectedConsumer.thrownException.isDefined
+      }, "Extra consumer did not throw an exception")
+      assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
+
+      // assert group continues to live
+      producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
+      TestUtils.waitUntilTrue(() => {
+        consumerPollers.forall(p => p.receivedMessages >= 100)
+      }, "The consumers in the group could not fetch the expected records", 10000L)
+    } finally {
+      consumerPollers.foreach(_.shutdown())
+    }
   }
 
   /**
@@ -437,6 +449,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
       else
         subscribeConsumerAndStartPolling(consumer, List(topic))
 
+    consumerPollers += consumerPoller
     receiveExactRecords(consumerPoller, numRecords)
     consumerPoller.shutdown()
     consumer