You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:23 UTC

[26/37] git commit: KAFKA-1544 Log cleaner takes a long time to shut down. Patch from Manikumar Reddy.

KAFKA-1544 Log cleaner takes a long time to shut down. Patch from Manikumar Reddy.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/db41f98e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/db41f98e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/db41f98e

Branch: refs/heads/transactional_messaging
Commit: db41f98ea943d806362487b8f8192360a488eb39
Parents: 3f1a9c4
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Jul 24 15:53:24 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Jul 24 16:34:40 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/db41f98e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2faa196..afbeffc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -28,6 +28,8 @@ import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.lang.IllegalStateException
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -173,7 +175,8 @@ class LogCleaner(val config: CleanerConfig,
                               checkDone = checkDone)
     
     @volatile var lastStats: CleanerStats = new CleanerStats()
-    
+    private val backOffWaitLatch = new CountDownLatch(1)
+
     private def checkDone(topicAndPartition: TopicAndPartition) {
       if (!isRunning.get())
         throw new ThreadShutdownException
@@ -187,6 +190,13 @@ class LogCleaner(val config: CleanerConfig,
       cleanOrSleep()
     }
     
+    
+    override def shutdown() = {
+    	 initiateShutdown()
+    	 backOffWaitLatch.countDown()
+    	 awaitShutdown()
+     }
+     
     /**
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
@@ -194,7 +204,7 @@ class LogCleaner(val config: CleanerConfig,
       cleanerManager.grabFilthiestLog() match {
         case None =>
           // there are no cleanable logs, sleep a while
-          time.sleep(config.backOffMs)
+          backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
         case Some(cleanable) =>
           // there's a log, clean it
           var endOffset = cleanable.firstDirtyOffset