You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/04/22 02:08:59 UTC
git commit: KAFKA-1327;
Log cleaner metrics follow-up patch to reset dirtiest log cleanable
ratio; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.8.1 1e9e107ee -> 874620d96
KAFKA-1327; Log cleaner metrics follow-up patch to reset dirtiest log
cleanable ratio; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/874620d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/874620d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/874620d9
Branch: refs/heads/0.8.1
Commit: 874620d965c066519686315c591e09aa379304d6
Parents: 1e9e107
Author: Joel Koshy <jj...@gmail.com>
Authored: Mon Apr 21 11:41:03 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Mon Apr 21 17:06:39 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 10 ++++++----
.../scala/kafka/log/LogCleanerManager.scala | 20 +++++++++++---------
2 files changed, 17 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/874620d9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index b9ffe00..2faa196 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -19,7 +19,6 @@ package kafka.log
import scala.collection._
import scala.math
-import java.util.concurrent.TimeUnit
import java.nio._
import java.util.Date
import java.io.File
@@ -215,6 +214,7 @@ class LogCleaner(val config: CleanerConfig,
*/
def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
this.lastStats = stats
+ cleaner.statsUnderlying.swap
def mb(bytes: Double) = bytes / (1024*1024)
val message =
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
@@ -260,9 +260,10 @@ private[log] class Cleaner(val id: Int,
this.logIdent = "Cleaner " + id + ": "
- /* stats on this cleaning */
- val stats = new CleanerStats(time)
-
+ /* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */
+ val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
+ def stats = statsUnderlying._1
+
/* buffer used for read i/o */
private var readBuffer = ByteBuffer.allocate(ioBufferSize)
@@ -304,6 +305,7 @@ private[log] class Cleaner(val id: Int,
stats.bufferUtilization = offsetMap.utilization
stats.allDone()
+
endOffset
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/874620d9/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 683d722..e8ced6a 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
private val pausedCleaningCond = lock.newCondition()
/* a gauge for tracking the cleanable ratio of the dirtiest log */
- private var dirtiestLogCleanableRatio = 0.0
+ @volatile private var dirtiestLogCleanableRatio = 0.0
newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
/**
@@ -79,9 +79,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
.filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
.map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each
lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
- .filter(l => l.totalBytes > 0) // skip any empty logs
- if(!dirtyLogs.isEmpty)
- this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio
+ .filter(l => l.totalBytes > 0) // skip any empty logs
+ this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
if(cleanableLogs.isEmpty) {
None
@@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
- throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s))
+ throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
+ .format(topicAndPartition, s))
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
@@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
- throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition))
+ throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
+ .format(topicAndPartition))
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
- throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s))
+ throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
+ .format(topicAndPartition, s))
}
}
}
- info("The cleaning for partition %s is resumed".format(topicAndPartition))
+ info("Compaction for partition %s is resumed".format(topicAndPartition))
}
/**
@@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
- throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s))
+ throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
}
}
}