You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2024/03/15 02:34:59 UTC
(kafka) branch 3.7 updated: KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new d69f370f07c KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
d69f370f07c is described below
commit d69f370f07c3dd58770111a9d38c0fbb48ae1bbd
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Fri Mar 15 08:03:40 2024 +0530
KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
It is possible that due to resource constraint, ShutdownableThread#run might be called later than the ShutdownableThread#close method.
Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
.../unit/kafka/log/remote/RemoteIndexCacheTest.scala | 2 ++
.../scala/unit/kafka/utils/ShutdownableThreadTest.scala | 17 ++++++++++++++++-
.../apache/kafka/server/util/ShutdownableThread.java | 4 ++++
3 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 48b01503bb6..ff65868674b 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -340,6 +340,8 @@ class RemoteIndexCacheTest {
val spyEntry = generateSpyCacheEntry()
cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+ TestUtils.waitUntilTrue(() => cache.cleanerThread().isStarted, "Cleaner thread should be started")
+
// close the cache
cache.close()
diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
index 7b086d11509..da998903d27 100644
--- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
class ShutdownableThreadTest {
@@ -51,4 +51,19 @@ class ShutdownableThreadTest {
assertEquals(1, statusCodeOption.get)
}
+ @Test
+ def testIsThreadStarted(): Unit = {
+ val latch = new CountDownLatch(1)
+ val thread = new ShutdownableThread("shutdownable-thread-test") {
+ override def doWork(): Unit = {
+ latch.countDown()
+ }
+ }
+ assertFalse(thread.isStarted)
+ thread.start()
+ latch.await()
+ assertTrue(thread.isStarted)
+
+ thread.shutdown()
+ }
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
index 06c751e0bb2..4268af92f84 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
@@ -68,6 +68,10 @@ public abstract class ShutdownableThread extends Thread {
return shutdownComplete.getCount() == 0;
}
+ public boolean isStarted() {
+ return isStarted;
+ }
+
/**
* @return true if there has been an unexpected error and the thread shut down
*/