You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/04 05:05:54 UTC
[kafka] branch 2.3 updated: KAFKA-8448: Cancel
PeriodicProducerExpirationCheck when closing a Log instance (#6847)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new f1a67e6 KAFKA-8448: Cancel PeriodicProducerExpirationCheck when closing a Log instance (#6847)
f1a67e6 is described below
commit f1a67e6234147b6c38259f7bb35b33023d4679e0
Author: jolshan <jo...@confluent.io>
AuthorDate: Tue Jun 18 10:52:59 2019 -0700
KAFKA-8448: Cancel PeriodicProducerExpirationCheck when closing a Log instance (#6847)
Cancel PeriodicProducerExpirationCheck when closing a Log instance, to avoid a memory leak. Add a method to KafkaScheduler to make this possible.
Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/log/Log.scala | 3 +-
.../main/scala/kafka/utils/KafkaScheduler.scala | 13 +++++-
.../scala/unit/kafka/utils/MockScheduler.scala | 49 ++++++++++++++++++++--
.../scala/unit/kafka/utils/SchedulerTest.scala | 24 ++++++++++-
4 files changed, 81 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 3291e1d..6ca83f8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -333,7 +333,7 @@ class Log(@volatile var dir: File,
},
tags)
- scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
+ val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
producerStateManager.removeExpiredProducers(time.milliseconds)
}
@@ -762,6 +762,7 @@ class Log(@volatile var dir: File,
debug("Closing log")
lock synchronized {
checkIfMemoryMappedBufferClosed()
+ producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index b4fae0b..cee4478 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -51,8 +51,9 @@ trait Scheduler {
* @param delay The amount of time to wait before the first execution
* @param period The period with which to execute the task. If < 0 the task will execute only once.
* @param unit The unit for the preceding times.
+ * @return A Future object to manage the task scheduled.
*/
- def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
+ def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) : ScheduledFuture[_]
}
/**
@@ -79,6 +80,7 @@ class KafkaScheduler(val threads: Int,
executor = new ScheduledThreadPoolExecutor(threads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+ executor.setRemoveOnCancelPolicy(true)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
@@ -103,7 +105,7 @@ class KafkaScheduler(val threads: Int,
schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
}
- def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit) {
+ def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): ScheduledFuture[_] = {
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
this synchronized {
@@ -125,6 +127,13 @@ class KafkaScheduler(val threads: Int,
}
}
+ /**
+ * Package private for testing.
+ */
+ private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+ executor.getQueue().contains(task)
+ }
+
def resizeThreadPool(newSize: Int): Unit = {
executor.setCorePoolSize(newSize)
}
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 5ebdf40..523be95 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -17,7 +17,7 @@
package kafka.utils
import scala.collection.mutable.PriorityQueue
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Delayed, ScheduledFuture, TimeUnit}
import org.apache.kafka.common.utils.Time
@@ -71,11 +71,14 @@ class MockScheduler(val time: Time) extends Scheduler {
}
}
- def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+ def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = {
+ var task : MockTask = null
this synchronized {
- tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+ task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
+ tasks += task
tick()
}
+ task
}
def clear(): Unit = {
@@ -86,7 +89,7 @@ class MockScheduler(val time: Time) extends Scheduler {
}
-case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long) extends Ordered[MockTask] {
+case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] {
def periodic = period >= 0
def compare(t: MockTask): Int = {
if(t.nextExecution == nextExecution)
@@ -96,4 +99,42 @@ case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, peri
else
1
}
+
+ /**
+ * Not used, so not not fully implemented
+ */
+ def cancel(mayInterruptIfRunning: Boolean) : Boolean = {
+ false
+ }
+
+ def get() {
+ }
+
+ def get(timeout: Long, unit: TimeUnit){
+ }
+
+ def isCancelled: Boolean = {
+ false
+ }
+
+ def isDone: Boolean = {
+ false
+ }
+
+ def getDelay(unit: TimeUnit): Long = {
+ this synchronized {
+ time.milliseconds - nextExecution
+ }
+ }
+
+ def compareTo(o : Delayed) : Int = {
+ this.getDelay(TimeUnit.MILLISECONDS).compareTo(o.getDelay(TimeUnit.MILLISECONDS))
+ }
+}
+object MockTask {
+ implicit def MockTaskOrdering : Ordering[MockTask] = new Ordering[MockTask] {
+ def compare(x: MockTask, y: MockTask): Int = {
+ x.compare(y)
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index dbb818c..449d840 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -18,8 +18,11 @@ package kafka.utils
import org.junit.Assert._
import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import org.junit.{After, Before, Test}
import kafka.utils.TestUtils.retry
+import java.util.Properties
class SchedulerTest {
@@ -107,4 +110,23 @@ class SchedulerTest {
mockTime.sleep(1)
assertEquals(2, counter1.get())
}
+
+ @Test
+ def testUnscheduleProducerTask(): Unit = {
+ val tmpDir = TestUtils.tempDir()
+ val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+ val logConfig = LogConfig(new Properties())
+ val brokerTopicStats = new BrokerTopicStats
+ val recoveryPoint = 0L
+ val maxProducerIdExpirationMs = 60 * 60 * 1000
+ val topicPartition = Log.parseTopicPartitionName(logDir)
+ val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
+ val log = new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, scheduler,
+ brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs,
+ topicPartition, producerStateManager, new LogDirFailureChannel(10))
+ assertTrue(scheduler.taskRunning(log.producerExpireCheck))
+ log.close()
+ assertTrue(!(scheduler.taskRunning(log.producerExpireCheck)))
+ }
+
}
\ No newline at end of file