You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/31 02:14:24 UTC

kafka git commit: KAFKA-2660; Correct cleanableRatio calculation

Repository: kafka
Updated Branches:
  refs/heads/trunk eec222e98 -> b94435699


KAFKA-2660; Correct cleanableRatio calculation

onurkaraman Could you have a look? This is the patch I discussed with you.

Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <li...@cis.upenn.edu>

Reviewers: Onur Karaman <ok...@linkedin.com>, Joel Koshy <jj...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #316 from lindong28/KAFKA-2660


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9443569
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9443569
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9443569

Branch: refs/heads/trunk
Commit: b944356990d97401721a5752f8dcdd1ac5b8d4d4
Parents: eec222e
Author: Dong Lin <li...@gmail.com>
Authored: Fri Oct 30 18:14:20 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Oct 30 18:14:20 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  1 -
 core/src/main/scala/kafka/log/LogCleaner.scala  | 11 ++++++----
 .../test/scala/unit/kafka/log/CleanerTest.scala | 23 +++++++++++++++-----
 3 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 02205c9..9667aaa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -353,7 +353,6 @@ class Log(val dir: File,
             .format(validMessages.sizeInBytes, config.segmentSize))
         }
 
-
         // maybe roll the log if this segment is full
         val segment = maybeRoll(validMessages.sizeInBytes)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 16dd945..d5c247c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -274,9 +274,12 @@ class LogCleaner(val config: CleanerConfig,
  * This class holds the actual logic for cleaning a log
  * @param id An identifier used for logging
  * @param offsetMap The map used for deduplication
- * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param maxIoBufferSize The maximum size of a message that can appear in the log
+ * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer
  * @param throttler The throttler instance to use for limiting I/O rate.
  * @param time The time instance
+ * @param checkDone Check if the cleaning for a partition is finished or aborted.
  */
 private[log] class Cleaner(val id: Int,
                            val offsetMap: OffsetMap,
@@ -541,8 +544,8 @@ private[log] class Cleaner(val id: Int,
       var indexSize = segs.head.index.sizeInBytes
       segs = segs.tail
       while(!segs.isEmpty &&
-            logSize + segs.head.size < maxSize &&
-            indexSize + segs.head.index.sizeInBytes < maxIndexSize &&
+            logSize + segs.head.size <= maxSize &&
+            indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
             segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
@@ -686,7 +689,7 @@ private case class CleanerStats(time: Time = SystemTime) {
  * Helper class for a log, its topic/partition, and the last clean position
  */
 private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
-  val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum
+  val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
   val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
   val cleanableRatio = dirtyBytes / totalBytes.toDouble
   def totalBytes = cleanBytes + dirtyBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9443569/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 9c4518c..49869aa 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -107,6 +107,24 @@ class CleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testLogToClean: Unit = {
+    // create a log with small segment size
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    // create 6 segments with only one message in each segment
+    val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    for (i <- 0 until 6)
+      log.append(messageSet, assignOffsets = true)
+
+    val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset)
+
+    assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment",
+      logToClean.totalBytes, log.size - log.activeSegment.size)
+  }
+
+  @Test
   def testCleaningWithUnkeyedMessages {
     val cleaner = makeCleaner(Int.MaxValue)
 
@@ -129,11 +147,6 @@ class CleanerTest extends JUnitSuite {
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
-
-    // turn on compaction and compact the log
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val compactedLog = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
 
     assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))