You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/31 02:14:24 UTC
kafka git commit: KAFKA-2660; Correct cleanableRatio calculation
Repository: kafka
Updated Branches:
refs/heads/trunk eec222e98 -> b94435699
KAFKA-2660; Correct cleanableRatio calculation
onurkaraman Could you have a look? This is the patch I discussed with you.
Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <li...@cis.upenn.edu>
Reviewers: Onur Karaman <ok...@linkedin.com>, Joel Koshy <jj...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #316 from lindong28/KAFKA-2660
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9443569
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9443569
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9443569
Branch: refs/heads/trunk
Commit: b944356990d97401721a5752f8dcdd1ac5b8d4d4
Parents: eec222e
Author: Dong Lin <li...@gmail.com>
Authored: Fri Oct 30 18:14:20 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Oct 30 18:14:20 2015 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 1 -
core/src/main/scala/kafka/log/LogCleaner.scala | 11 ++++++----
.../test/scala/unit/kafka/log/CleanerTest.scala | 23 +++++++++++++++-----
3 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 02205c9..9667aaa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -353,7 +353,6 @@ class Log(val dir: File,
.format(validMessages.sizeInBytes, config.segmentSize))
}
-
// maybe roll the log if this segment is full
val segment = maybeRoll(validMessages.sizeInBytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 16dd945..d5c247c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -274,9 +274,12 @@ class LogCleaner(val config: CleanerConfig,
* This class holds the actual logic for cleaning a log
* @param id An identifier used for logging
* @param offsetMap The map used for deduplication
- * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param maxIoBufferSize The maximum size of a message that can appear in the log
+ * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer
* @param throttler The throttler instance to use for limiting I/O rate.
* @param time The time instance
+ * @param checkDone Check if the cleaning for a partition is finished or aborted.
*/
private[log] class Cleaner(val id: Int,
val offsetMap: OffsetMap,
@@ -541,8 +544,8 @@ private[log] class Cleaner(val id: Int,
var indexSize = segs.head.index.sizeInBytes
segs = segs.tail
while(!segs.isEmpty &&
- logSize + segs.head.size < maxSize &&
- indexSize + segs.head.index.sizeInBytes < maxIndexSize &&
+ logSize + segs.head.size <= maxSize &&
+ indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
@@ -686,7 +689,7 @@ private case class CleanerStats(time: Time = SystemTime) {
* Helper class for a log, its topic/partition, and the last clean position
*/
private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
- val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum
+ val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
val cleanableRatio = dirtyBytes / totalBytes.toDouble
def totalBytes = cleanBytes + dirtyBytes
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 9c4518c..49869aa 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -107,6 +107,24 @@ class CleanerTest extends JUnitSuite {
}
@Test
+ def testLogToClean: Unit = {
+ // create a log with small segment size
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ // create 6 segments with only one message in each segment
+ val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+ for (i <- 0 until 6)
+ log.append(messageSet, assignOffsets = true)
+
+ val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset)
+
+ assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment",
+ logToClean.totalBytes, log.size - log.activeSegment.size)
+ }
+
+ @Test
def testCleaningWithUnkeyedMessages {
val cleaner = makeCleaner(Int.MaxValue)
@@ -129,11 +147,6 @@ class CleanerTest extends JUnitSuite {
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
-
- // turn on compaction and compact the log
- logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
- val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))