You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/04 05:05:54 UTC

[kafka] branch 2.3 updated: KAFKA-8448: Cancel PeriodicProducerExpirationCheck when closing a Log instance (#6847)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f1a67e6  KAFKA-8448: Cancel PeriodicProducerExpirationCheck when closing a Log instance (#6847)
f1a67e6 is described below

commit f1a67e6234147b6c38259f7bb35b33023d4679e0
Author: jolshan <jo...@confluent.io>
AuthorDate: Tue Jun 18 10:52:59 2019 -0700

    KAFKA-8448: Cancel PeriodicProducerExpirationCheck when closing a Log instance (#6847)
    
    Cancel PeriodicProducerExpirationCheck when closing a Log instance, to avoid a memory leak.  Add a method to KafkaScheduler to make this possible.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala            |  3 +-
 .../main/scala/kafka/utils/KafkaScheduler.scala    | 13 +++++-
 .../scala/unit/kafka/utils/MockScheduler.scala     | 49 ++++++++++++++++++++--
 .../scala/unit/kafka/utils/SchedulerTest.scala     | 24 ++++++++++-
 4 files changed, 81 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 3291e1d..6ca83f8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -333,7 +333,7 @@ class Log(@volatile var dir: File,
     },
     tags)
 
-  scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
+  val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
     lock synchronized {
       producerStateManager.removeExpiredProducers(time.milliseconds)
     }
@@ -762,6 +762,7 @@ class Log(@volatile var dir: File,
     debug("Closing log")
     lock synchronized {
       checkIfMemoryMappedBufferClosed()
+      producerExpireCheck.cancel(true)
       maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
         // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
         // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index b4fae0b..cee4478 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -51,8 +51,9 @@ trait Scheduler {
    * @param delay The amount of time to wait before the first execution
    * @param period The period with which to execute the task. If < 0 the task will execute only once.
    * @param unit The unit for the preceding times.
+   * @return A Future object to manage the task scheduled.
    */
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
 }
 
 /**
@@ -79,6 +80,7 @@ class KafkaScheduler(val threads: Int,
       executor = new ScheduledThreadPoolExecutor(threads)
       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+      executor.setRemoveOnCancelPolicy(true)
       executor.setThreadFactory(new ThreadFactory() {
                                   def newThread(runnable: Runnable): Thread = 
                                     new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
@@ -103,7 +105,7 @@ class KafkaScheduler(val threads: Int,
     schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
   }
 
-  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit) {
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): ScheduledFuture[_] = {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
@@ -125,6 +127,13 @@ class KafkaScheduler(val threads: Int,
     }
   }
 
+  /**
+   * Package private for testing.
+   */
+  private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+    executor.getQueue().contains(task)
+  }
+
   def resizeThreadPool(newSize: Int): Unit = {
     executor.setCorePoolSize(newSize)
   }
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 5ebdf40..523be95 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -17,7 +17,7 @@
 package kafka.utils
 
 import scala.collection.mutable.PriorityQueue
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Delayed, ScheduledFuture, TimeUnit}
 
 import org.apache.kafka.common.utils.Time
 
@@ -71,11 +71,14 @@ class MockScheduler(val time: Time) extends Scheduler {
     }
   }
   
-  def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+  def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = {
+    var task : MockTask = null
     this synchronized {
-      tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+      task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
+      tasks += task
       tick()
     }
+    task
   }
 
   def clear(): Unit = {
@@ -86,7 +89,7 @@ class MockScheduler(val time: Time) extends Scheduler {
   
 }
 
-case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long) extends Ordered[MockTask] {
+case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] {
   def periodic = period >= 0
   def compare(t: MockTask): Int = {
     if(t.nextExecution == nextExecution)
@@ -96,4 +99,42 @@ case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, peri
     else
       1
   }
+
+  /**
+    * Not used, so not not fully implemented
+    */
+  def cancel(mayInterruptIfRunning: Boolean) : Boolean = {
+    false
+  }
+
+  def get() {
+  }
+
+  def get(timeout: Long, unit: TimeUnit){
+  }
+
+  def isCancelled: Boolean = {
+    false
+  }
+
+  def isDone: Boolean = {
+    false
+  }
+
+  def getDelay(unit: TimeUnit): Long = {
+    this synchronized {
+      time.milliseconds - nextExecution
+    }
+  }
+
+  def compareTo(o : Delayed) : Int = {
+    this.getDelay(TimeUnit.MILLISECONDS).compareTo(o.getDelay(TimeUnit.MILLISECONDS))
+  }
+}
+object MockTask {
+  implicit def MockTaskOrdering : Ordering[MockTask] = new Ordering[MockTask] {
+    def compare(x: MockTask, y: MockTask): Int = {
+      x.compare(y)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index dbb818c..449d840 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -18,8 +18,11 @@ package kafka.utils
 
 import org.junit.Assert._
 import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import org.junit.{After, Before, Test}
 import kafka.utils.TestUtils.retry
+import java.util.Properties
 
 class SchedulerTest {
 
@@ -107,4 +110,23 @@ class SchedulerTest {
     mockTime.sleep(1)
     assertEquals(2, counter1.get())
   }
+
+  @Test
+  def testUnscheduleProducerTask(): Unit = {
+    val tmpDir = TestUtils.tempDir()
+    val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    val logConfig = LogConfig(new Properties())
+    val brokerTopicStats = new BrokerTopicStats
+    val recoveryPoint = 0L
+    val maxProducerIdExpirationMs = 60 * 60 * 1000
+    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
+    val log = new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, scheduler,
+      brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs,
+      topicPartition, producerStateManager, new LogDirFailureChannel(10))
+    assertTrue(scheduler.taskRunning(log.producerExpireCheck))
+    log.close()
+    assertTrue(!(scheduler.taskRunning(log.producerExpireCheck)))
+  }
+
 }
\ No newline at end of file