You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/04 13:53:50 UTC
[kafka] branch 2.4 updated: KAFKA-9265: Fix kafka.log.Log instance
leak on log deletion (#7773)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 39ffc0d KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
39ffc0d is described below
commit 39ffc0d2e56c9139970121740c77381f9555d3ef
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Wed Dec 4 05:52:29 2019 -0800
KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
KAFKA-8448 fixes problem with similar leak. The Log objects are being
held in ScheduledExecutor PeriodicProducerExpirationCheck callback. The
fix in KAFKA-8448 was to change the policy of ScheduledExecutor to
remove the scheduled task when it gets canceled (by calling
setRemoveOnCancelPolicy(true)).
This works when a log is closed using close() method. But when a log is
deleted either when the topic gets deleted or when the rebalancing
operation moves the replica away from broker, the delete() operation is
invoked. Log.delete() doesn't close the pending scheduled task and that
leaks Log instance.
Fix is to close the scheduled task in the Log.delete() method too.
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/log/Log.scala | 1 +
.../main/scala/kafka/utils/KafkaScheduler.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 24 ++++++++++++++++++++++
3 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6514aa2..42697c7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2005,6 +2005,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeLogMetrics()
+ producerExpireCheck.cancel(true)
removeAndDeleteSegments(logSegments, asyncDelete = false)
leaderEpochCache.foreach(_.clear())
Utils.delete(dir)
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index f7f5995..a175fde 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -130,7 +130,7 @@ class KafkaScheduler(val threads: Int,
/**
* Package private for testing.
*/
- private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+ private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
executor.getQueue().contains(task)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a948cf3..4dba8ba 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -611,6 +611,30 @@ class LogTest {
assertFalse(logDir.exists)
}
+ /**
+ * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled after log
+ * is deleted.
+ */
+ @Test
+ def testProducerExpireCheckAfterDelete(): Unit = {
+ val scheduler = new KafkaScheduler(1)
+ try {
+ scheduler.startup()
+ val logConfig = LogTest.createLogConfig()
+ val log = createLog(logDir, logConfig, scheduler = scheduler)
+
+ val producerExpireCheck = log.producerExpireCheck
+ assertTrue("producerExpireCheck isn't as part of scheduled tasks",
+ scheduler.taskRunning(producerExpireCheck))
+
+ log.delete()
+ assertFalse("producerExpireCheck is part of scheduled tasks even after log deletion",
+ scheduler.taskRunning(producerExpireCheck))
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)