You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:23 UTC
[26/37] git commit: KAFKA-1544 Log cleaner takes a long time to shut
down. Patch from Manikumar Reddy.
KAFKA-1544 Log cleaner takes a long time to shut down. Patch from Manikumar Reddy.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db41f98e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db41f98e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db41f98e
Branch: refs/heads/transactional_messaging
Commit: db41f98ea943d806362487b8f8192360a488eb39
Parents: 3f1a9c4
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Jul 24 15:53:24 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Jul 24 16:34:40 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/db41f98e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2faa196..afbeffc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -28,6 +28,8 @@ import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.lang.IllegalStateException
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -173,7 +175,8 @@ class LogCleaner(val config: CleanerConfig,
checkDone = checkDone)
@volatile var lastStats: CleanerStats = new CleanerStats()
-
+ private val backOffWaitLatch = new CountDownLatch(1)
+
private def checkDone(topicAndPartition: TopicAndPartition) {
if (!isRunning.get())
throw new ThreadShutdownException
@@ -187,6 +190,13 @@ class LogCleaner(val config: CleanerConfig,
cleanOrSleep()
}
+
+ override def shutdown() = {
+ initiateShutdown()
+ backOffWaitLatch.countDown()
+ awaitShutdown()
+ }
+
/**
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
@@ -194,7 +204,7 @@ class LogCleaner(val config: CleanerConfig,
cleanerManager.grabFilthiestLog() match {
case None =>
// there are no cleanable logs, sleep a while
- time.sleep(config.backOffMs)
+ backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
case Some(cleanable) =>
// there's a log, clean it
var endOffset = cleanable.firstDirtyOffset