You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/22 02:23:21 UTC

git commit: KAFKA-1327; Log cleaner metrics follow-up patch to reset dirtiest log cleanable ratio; reviewed by Jun Rao (cherry picked from commit 874620d)

Repository: kafka
Updated Branches:
  refs/heads/trunk 89f040c8c -> 3af3efe37


KAFKA-1327; Log cleaner metrics follow-up patch to reset dirtiest log cleanable ratio; reviewed by Jun Rao
(cherry picked from commit 874620d)


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3af3efe3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3af3efe3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3af3efe3

Branch: refs/heads/trunk
Commit: 3af3efe3773348fd7adb8ca43f2abc5490416e55
Parents: 89f040c
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Apr 21 17:06:39 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Apr 21 17:12:48 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala    | 10 ++++++----
 .../main/scala/kafka/log/LogCleanerManager.scala  | 18 ++++++++++--------
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3af3efe3/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index b9ffe00..2faa196 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import scala.collection._
 import scala.math
-import java.util.concurrent.TimeUnit
 import java.nio._
 import java.util.Date
 import java.io.File
@@ -215,6 +214,7 @@ class LogCleaner(val config: CleanerConfig,
      */
     def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
       this.lastStats = stats
+      cleaner.statsUnderlying.swap
       def mb(bytes: Double) = bytes / (1024*1024)
       val message = 
         "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + 
@@ -260,9 +260,10 @@ private[log] class Cleaner(val id: Int,
 
   this.logIdent = "Cleaner " + id + ": "
   
-  /* stats on this cleaning */
-  val stats = new CleanerStats(time)
-  
+  /* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */
+  val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
+  def stats = statsUnderlying._1
+
   /* buffer used for read i/o */
   private var readBuffer = ByteBuffer.allocate(ioBufferSize)
   
@@ -304,6 +305,7 @@ private[log] class Cleaner(val id: Int,
     stats.bufferUtilization = offsetMap.utilization
     
     stats.allDone()
+
     endOffset
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3af3efe3/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 514941c..e8ced6a 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   private val pausedCleaningCond = lock.newCondition()
   
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
-  private var dirtiestLogCleanableRatio = 0.0
+  @volatile private var dirtiestLogCleanableRatio = 0.0
   newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
 
   /**
@@ -80,8 +80,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
                           .map(l => LogToClean(l._1, l._2,           // create a LogToClean instance for each
                                                lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
                           .filter(l => l.totalBytes > 0)             // skip any empty logs
-      if(!dirtyLogs.isEmpty)
-        this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
+      this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
       val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
       if(cleanableLogs.isEmpty) {
         None
@@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
             case LogCleaningInProgress =>
               inProgress.put(topicAndPartition, LogCleaningAborted)
             case s =>
-              throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s))
+              throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
+                                              .format(topicAndPartition, s))
           }
       }
       while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
@@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     inLock(lock) {
       inProgress.get(topicAndPartition) match {
         case None =>
-          throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition))
+          throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
+                                          .format(topicAndPartition))
         case Some(state) =>
           state match {
             case LogCleaningPaused =>
               inProgress.remove(topicAndPartition)
             case s =>
-              throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s))
+              throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
+                                              .format(topicAndPartition, s))
           }
       }
     }
-    info("The cleaning for partition %s is resumed".format(topicAndPartition))
+    info("Compaction for partition %s is resumed".format(topicAndPartition))
   }
 
   /**
@@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
           inProgress.put(topicAndPartition, LogCleaningPaused)
           pausedCleaningCond.signalAll()
         case s =>
-          throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s))
+          throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
       }
     }
   }