You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2024/03/15 02:34:59 UTC

(kafka) branch 3.7 updated: KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new d69f370f07c KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
d69f370f07c is described below

commit d69f370f07c3dd58770111a9d38c0fbb48ae1bbd
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Fri Mar 15 08:03:40 2024 +0530

    KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
    
    It is possible that due to resource constraint, ShutdownableThread#run might be called later than the ShutdownableThread#close method.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
 .../unit/kafka/log/remote/RemoteIndexCacheTest.scala    |  2 ++
 .../scala/unit/kafka/utils/ShutdownableThreadTest.scala | 17 ++++++++++++++++-
 .../apache/kafka/server/util/ShutdownableThread.java    |  4 ++++
 3 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 48b01503bb6..ff65868674b 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -340,6 +340,8 @@ class RemoteIndexCacheTest {
     val spyEntry = generateSpyCacheEntry()
     cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
 
+    TestUtils.waitUntilTrue(() => cache.cleanerThread().isStarted, "Cleaner thread should be started")
+
     // close the cache
     cache.close()
 
diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
index 7b086d11509..da998903d27 100644
--- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.server.util.ShutdownableThread
 import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
 class ShutdownableThreadTest {
 
@@ -51,4 +51,19 @@ class ShutdownableThreadTest {
     assertEquals(1, statusCodeOption.get)
   }
 
+  @Test
+  def testIsThreadStarted(): Unit = {
+    val latch = new CountDownLatch(1)
+    val thread = new ShutdownableThread("shutdownable-thread-test") {
+      override def doWork(): Unit = {
+        latch.countDown()
+      }
+    }
+    assertFalse(thread.isStarted)
+    thread.start()
+    latch.await()
+    assertTrue(thread.isStarted)
+
+    thread.shutdown()
+  }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
index 06c751e0bb2..4268af92f84 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
@@ -68,6 +68,10 @@ public abstract class ShutdownableThread extends Thread {
         return shutdownComplete.getCount() == 0;
     }
 
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     /**
      * @return true if there has been an unexpected error and the thread shut down
      */