You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/04 13:53:50 UTC

[kafka] branch 2.4 updated: KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 39ffc0d  KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
39ffc0d is described below

commit 39ffc0d2e56c9139970121740c77381f9555d3ef
Author: Vikas Singh <vi...@confluent.io>
AuthorDate: Wed Dec 4 05:52:29 2019 -0800

    KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
    
    KAFKA-8448 fixes problem with similar leak. The Log objects are being
    held in ScheduledExecutor PeriodicProducerExpirationCheck callback. The
    fix in KAFKA-8448 was to change the policy of ScheduledExecutor to
    remove the scheduled task when it gets canceled (by calling
    setRemoveOnCancelPolicy(true)).
    
    This works when a log is closed using close() method. But when a log is
    deleted either when the topic gets deleted or when the rebalancing
    operation moves the replica away from broker, the delete() operation is
    invoked. Log.delete() doesn't close the pending scheduled task and that
    leaks Log instance.
    
    Fix is to close the scheduled task in the Log.delete() method too.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala            |  1 +
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 24 ++++++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6514aa2..42697c7 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2005,6 +2005,7 @@ class Log(@volatile var dir: File,
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
         removeLogMetrics()
+        producerExpireCheck.cancel(true)
         removeAndDeleteSegments(logSegments, asyncDelete = false)
         leaderEpochCache.foreach(_.clear())
         Utils.delete(dir)
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index f7f5995..a175fde 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -130,7 +130,7 @@ class KafkaScheduler(val threads: Int,
   /**
    * Package private for testing.
    */
-  private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+  private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
     executor.getQueue().contains(task)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a948cf3..4dba8ba 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -611,6 +611,30 @@ class LogTest {
     assertFalse(logDir.exists)
   }
 
+  /**
+   * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled after log
+   * is deleted.
+   */
+  @Test
+  def testProducerExpireCheckAfterDelete(): Unit = {
+    val scheduler = new KafkaScheduler(1)
+    try {
+      scheduler.startup()
+      val logConfig = LogTest.createLogConfig()
+      val log = createLog(logDir, logConfig, scheduler = scheduler)
+
+      val producerExpireCheck = log.producerExpireCheck
+      assertTrue("producerExpireCheck isn't as part of scheduled tasks",
+        scheduler.taskRunning(producerExpireCheck))
+
+      log.delete()
+      assertFalse("producerExpireCheck is part of scheduled tasks even after log deletion",
+        scheduler.taskRunning(producerExpireCheck))
+    } finally {
+      scheduler.shutdown();
+    }
+  }
+
   private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
     var log = createLog(logDir, logConfig)