You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/21 07:07:07 UTC
kafka git commit: KAFKA-2977: Transient Failure in
kafka.log.LogCleanerIntegrationTest.cleanerTest
Repository: kafka
Updated Branches:
refs/heads/trunk 3615e4773 -> d2632d011
KAFKA-2977: Transient Failure in kafka.log.LogCleanerIntegrationTest.cleanerTest
Make MinCleanableDirtyRatioProp configurable(default 0.0F)in makeCleaner, thus log cleaning is always undergoing;
Also removed minDirtyMessages.
Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes #671 from ZoneMayor/trunk-KAFKA-2977
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2632d01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2632d01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2632d01
Branch: refs/heads/trunk
Commit: d2632d011f7bda3cbd0cbcc8f2a3bd1e985d5b1b
Parents: 3615e47
Author: Jin Xing <ji...@fenbi.com>
Authored: Sun Dec 20 22:06:56 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Dec 20 22:06:56 2015 -0800
----------------------------------------------------------------------
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2632d01/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index d7f3156..de3d7a3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
- // wait until we clean up to base_offset of active segment - minDirtyMessages
+ // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than LogConfig.MinCleanableDirtyRatioProp
cleaner.awaitCleaned("log", 0, firstDirty)
-
+ val compactedSize = log.logSegments.map(_.size).sum
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
- assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned >= firstDirty);
+ assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned", lastCleaned >= firstDirty)
+ assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
val read = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
@@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
cleaner.awaitCleaned("log", 0, firstDirty2)
val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
- assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2 >= firstDirty2);
+ assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2 >= firstDirty2);
val read2 = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
@@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
/* create a cleaner instance and logs with the given parameters */
def makeCleaner(parts: Int,
- minDirtyMessages: Int = 0,
+ minCleanableDirtyRatio: Float = 0.0F,
numThreads: Int = 1,
defaultPolicy: String = "compact",
policyOverrides: Map[String, String] = Map()): LogCleaner = {
@@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+
val log = new Log(dir = dir,
LogConfig(logProps),
recoveryPoint = 0L,