You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/21 07:07:07 UTC

kafka git commit: KAFKA-2977: Transient Failure in kafka.log.LogCleanerIntegrationTest.cleanerTest

Repository: kafka
Updated Branches:
  refs/heads/trunk 3615e4773 -> d2632d011


KAFKA-2977: Transient Failure in kafka.log.LogCleanerIntegrationTest.cleanerTest

Make MinCleanableDirtyRatioProp configurable(default 0.0F)in makeCleaner, thus log cleaning is always undergoing;
Also removed minDirtyMessages.

Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #671 from ZoneMayor/trunk-KAFKA-2977


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2632d01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2632d01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2632d01

Branch: refs/heads/trunk
Commit: d2632d011f7bda3cbd0cbcc8f2a3bd1e985d5b1b
Parents: 3615e47
Author: Jin Xing <ji...@fenbi.com>
Authored: Sun Dec 20 22:06:56 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Dec 20 22:06:56 2015 -0800

----------------------------------------------------------------------
 .../unit/kafka/log/LogCleanerIntegrationTest.scala     | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2632d01/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index d7f3156..de3d7a3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner.startup()
 
     val firstDirty = log.activeSegment.baseOffset
-    // wait until we clean up to base_offset of active segment - minDirtyMessages
+    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than LogConfig.MinCleanableDirtyRatioProp
     cleaner.awaitCleaned("log", 0, firstDirty)
-
+    val compactedSize = log.logSegments.map(_.size).sum
     val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
-    assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned >= firstDirty);
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", lastCleaned >= firstDirty)
+    assertTrue(s"log should have been compacted:  startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
     
     val read = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
@@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner.awaitCleaned("log", 0, firstDirty2)
 
     val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
-    assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2 >= firstDirty2);
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2);
 
     val read2 = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
@@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   
   /* create a cleaner instance and logs with the given parameters */
   def makeCleaner(parts: Int, 
-                  minDirtyMessages: Int = 0, 
+                  minCleanableDirtyRatio: Float = 0.0F,
                   numThreads: Int = 1,
                   defaultPolicy: String = "compact",
                   policyOverrides: Map[String, String] = Map()): LogCleaner = {
@@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
       logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
       logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+      logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+
       val log = new Log(dir = dir,
                         LogConfig(logProps),
                         recoveryPoint = 0L,