You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/01/06 09:24:32 UTC

[kafka] branch trunk updated: MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b9a8ba  MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
0b9a8ba is described below

commit 0b9a8bac36f16b5397e9ec3a0441758e4b60a384
Author: Lucas Bradstreet <lu...@gmail.com>
AuthorDate: Thu Jan 6 01:22:19 2022 -0800

    MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 core/src/main/scala/kafka/server/ClientQuotaManager.scala | 12 +++++++++++-
 core/src/main/scala/kafka/server/DelayedOperation.scala   | 11 +++++++++--
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5860cfc..7334519 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -562,8 +562,18 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       quotaMetricTags.asJava)
   }
 
+  def initiateShutdown(): Unit = {
+    throttledChannelReaper.initiateShutdown()
+    // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
+    delayQueue.add(new ThrottledChannel(time, 0, new ThrottleCallback {
+      override def startThrottling(): Unit = {}
+      override def endThrottling(): Unit = {}
+    }))
+  }
+
   def shutdown(): Unit = {
-    throttledChannelReaper.shutdown()
+    initiateShutdown()
+    throttledChannelReaper.awaitShutdown()
   }
 
   class DefaultQuotaCallback extends ClientQuotaCallback {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 251dd28..1151e65 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -328,8 +328,15 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
    * Shutdown the expire reaper thread
    */
   def shutdown(): Unit = {
-    if (reaperEnabled)
-      expirationReaper.shutdown()
+    if (reaperEnabled) {
+      expirationReaper.initiateShutdown()
+      // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
+      timeoutTimer.add(new TimerTask {
+        override val delayMs: Long = 0
+        override def run(): Unit = {}
+      })
+      expirationReaper.awaitShutdown()
+    }
     timeoutTimer.shutdown()
     removeMetric("PurgatorySize", metricsTags)
     removeMetric("NumDelayedOperations", metricsTags)