You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/01/06 09:24:32 UTC
[kafka] branch trunk updated: MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b9a8ba MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
0b9a8ba is described below
commit 0b9a8bac36f16b5397e9ec3a0441758e4b60a384
Author: Lucas Bradstreet <lu...@gmail.com>
AuthorDate: Thu Jan 6 01:22:19 2022 -0800
MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
core/src/main/scala/kafka/server/ClientQuotaManager.scala | 12 +++++++++++-
core/src/main/scala/kafka/server/DelayedOperation.scala | 11 +++++++++--
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5860cfc..7334519 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -562,8 +562,18 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
quotaMetricTags.asJava)
}
+ def initiateShutdown(): Unit = {
+ throttledChannelReaper.initiateShutdown()
+ // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
+ delayQueue.add(new ThrottledChannel(time, 0, new ThrottleCallback {
+ override def startThrottling(): Unit = {}
+ override def endThrottling(): Unit = {}
+ }))
+ }
+
def shutdown(): Unit = {
- throttledChannelReaper.shutdown()
+ initiateShutdown()
+ throttledChannelReaper.awaitShutdown()
}
class DefaultQuotaCallback extends ClientQuotaCallback {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 251dd28..1151e65 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -328,8 +328,15 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* Shutdown the expire reaper thread
*/
def shutdown(): Unit = {
- if (reaperEnabled)
- expirationReaper.shutdown()
+ if (reaperEnabled) {
+ expirationReaper.initiateShutdown()
+ // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op
+ timeoutTimer.add(new TimerTask {
+ override val delayMs: Long = 0
+ override def run(): Unit = {}
+ })
+ expirationReaper.awaitShutdown()
+ }
timeoutTimer.shutdown()
removeMetric("PurgatorySize", metricsTags)
removeMetric("NumDelayedOperations", metricsTags)