You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/08 19:55:00 UTC

[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error

    [ https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642395#comment-16642395 ] 

ASF GitHub Bot commented on KAFKA-7215:
---------------------------------------

junrao closed pull request #5439: KAFKA-7215: Improve LogCleaner Error Handling
URL: https://github.com/apache/kafka/pull/5439
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8915c14b364..094473a8e26 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -193,7 +193,7 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
-  // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers()
+  // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
   // After memory mapped buffer is closed, no disk IO operation should be performed for this log
   @volatile private var isMemoryMappedBufferClosed = false
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bf4f7e1fcba..0416325a4b8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.collection.{Iterable, Set, mutable}
+import scala.util.control.ControlThrowable
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy.
@@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig,
 
     /**
      * The main loop for the cleaner thread
+     * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     override def doWork() {
-      cleanOrSleep()
+      val cleaned = cleanFilthiestLog()
+      if (!cleaned)
+        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
-     * Clean a log if there is a dirty log available, otherwise sleep for a bit
-     */
-    private def cleanOrSleep() {
-      val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
-        case None =>
-          false
-        case Some(cleanable) =>
-          // there's a log, clean it
-          var endOffset = cleanable.firstDirtyOffset
-          try {
-            val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
-            recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
-            endOffset = nextDirtyOffset
-          } catch {
-            case _: LogCleaningAbortedException => // task can be aborted, let it go.
-            case _: KafkaStorageException => // partition is already offline. let it go.
-            case e: IOException =>
-              val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
-              logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
-          } finally {
-            cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+      * Cleans a log if there is a dirty log available
+      * @return whether a log was cleaned
+      */
+    private def cleanFilthiestLog(): Boolean = {
+      var currentLog: Option[Log] = None
+
+      try {
+        val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+          case None =>
+            false
+          case Some(cleanable) =>
+            // there's a log, clean it
+            currentLog = Some(cleanable.log)
+            cleanLog(cleanable)
+            true
+        }
+        val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+        try {
+          deletable.foreach {
+            case (topicPartition, log) =>
+              try {
+                currentLog = Some(log)
+                log.deleteOldSegments()
+              }
           }
-          true
+        } finally  {
+          cleanerManager.doneDeleting(deletable.map(_._1))
+        }
+
+        cleaned
+      } catch {
+        case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
+        case e: Exception =>
+          if (currentLog.isEmpty) {
+            throw new IllegalStateException("currentLog cannot be empty on an unexpected exception", e)
+          }
+          val erroneousLog = currentLog.get
+          warn(s"Unexpected exception thrown when cleaning log $erroneousLog. Marking its partition (${erroneousLog.topicPartition}) as uncleanable", e)
+          cleanerManager.markPartitionUncleanable(erroneousLog.dir.getParent, erroneousLog.topicPartition)
+
+          false
       }
-      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+    }
 
+    private def cleanLog(cleanable: LogToClean): Unit = {
+      var endOffset = cleanable.firstDirtyOffset
       try {
-        deletable.foreach {
-          case (_, log) =>
-            log.deleteOldSegments()
-        }
+        val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
+        recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
+        endOffset = nextDirtyOffset
+      } catch {
+        case _: LogCleaningAbortedException => // task can be aborted, let it go.
+        case _: KafkaStorageException => // partition is already offline. let it go.
+        case e: IOException =>
+          var logDirectory = cleanable.log.dir.getParent
+          val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
       } finally {
-        cleanerManager.doneDeleting(deletable.map(_._1))
+        cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
       }
-
-      if (!cleaned)
-        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
@@ -398,6 +425,18 @@ object LogCleaner {
     LogSegment.open(log.dir, baseOffset, log.config, Time.SYSTEM, fileAlreadyExists = false,
       fileSuffix = Log.CleanedFileSuffix, initFileSize = log.initFileSize, preallocate = log.config.preallocate)
   }
+
+  /**
+    * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
+    * @return the biggest uncleanable offset and the total amount of cleanable bytes
+    */
+  def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
+    val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
+    val firstUncleanableOffset = firstUncleanableSegment.baseOffset
+    val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+
+    (firstUncleanableOffset, cleanableBytes)
+  }
 }
 
 /**
@@ -951,9 +990,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
  */
 private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size.toLong).sum
-  private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
-  val firstUncleanableOffset = firstUncleanableSegment.baseOffset
-  val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
+  val (firstUncleanableOffset, cleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
   val totalBytes = cleanBytes + cleanableBytes
   val cleanableRatio = cleanableBytes / totalBytes.toDouble
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 7a96d8f4d80..13d14c17123 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -73,12 +73,55 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
 
+  /* the set of uncleanable partitions (partitions that have raised an unexpected error during cleaning)
+   *   for each log directory */
+  private val uncleanablePartitions = mutable.HashMap[String, mutable.Set[TopicPartition]]()
+
+  /* the set of directories marked as uncleanable and therefore offline */
+  private val uncleanableDirs = mutable.HashSet[String]()
+
   /* a global lock used to control all access to the in-progress set and the offset checkpoints */
   private val lock = new ReentrantLock
 
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  /* gauges for tracking the number of partitions marked as uncleanable for each log directory */
+  for (dir <- logDirs) {
+    newGauge(
+      "uncleanable-partitions-count",
+      new Gauge[Int] { def value = inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) } },
+      Map("logDirectory" -> dir.getAbsolutePath)
+    )
+  }
+
+  /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */
+    for (dir <- logDirs) {
+      newGauge(
+        "uncleanable-bytes",
+        new Gauge[Long] {
+          def value = {
+            inLock(lock) {
+              uncleanablePartitions.get(dir.getAbsolutePath) match {
+                case Some(partitions) => {
+                  val lastClean = allCleanerCheckpoints
+                  val now = Time.SYSTEM.milliseconds
+                  partitions.map { tp =>
+                    val log = logs.get(tp)
+                    val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, tp, lastClean, now)
+                    val (_, uncleanableBytes) = LogCleaner.calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset)
+                    uncleanableBytes
+                  }.sum
+                }
+                case _ => 0
+              }
+            }
+          }
+        },
+        Map("logDirectory" -> dir.getAbsolutePath)
+      )
+    }
+
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
   newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
@@ -135,7 +178,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val dirtyLogs = logs.filter {
         case (_, log) => log.config.compact  // match logs that are marked as compacted
       }.filterNot {
-        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
+        case (topicPartition, log) =>
+          // skip any logs already in-progress and uncleanable partitions
+          inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
       }.map {
         case (topicPartition, log) => // create a LogToClean instance for each
           val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
@@ -179,13 +224,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-    * Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments
+    * Find any logs that have compaction enabled. Mark them as being cleaned
+    * Include logs without delete enabled, as they may have segments
     * that precede the start offset.
     */
   def deletableLogs(): Iterable[(TopicPartition, Log)] = {
     inLock(lock) {
       val toClean = logs.filter { case (topicPartition, log) =>
-        !inProgress.contains(topicPartition) && log.config.compact
+        !inProgress.contains(topicPartition) && log.config.compact &&
+          !isUncleanablePartition(log, topicPartition)
       }
       toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
@@ -332,6 +379,12 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         case e: KafkaStorageException =>
           error(s"Failed to access checkpoint file in dir ${sourceLogDir.getAbsolutePath}", e)
       }
+
+      val logUncleanablePartitions = uncleanablePartitions.getOrElse(sourceLogDir.toString, mutable.Set[TopicPartition]())
+      if (logUncleanablePartitions.contains(topicPartition)) {
+        logUncleanablePartitions.remove(topicPartition)
+        markPartitionUncleanable(destLogDir.toString, topicPartition)
+      }
     }
   }
 
@@ -393,6 +446,33 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       }
     }
   }
+
+  /**
+   * Returns an immutable set of the uncleanable partitions for a given log directory
+   * Only used for testing
+   */
+  private[log] def uncleanablePartitions(logDir: String): Set[TopicPartition] = {
+    var partitions: Set[TopicPartition] = Set()
+    inLock(lock) { partitions ++= uncleanablePartitions.getOrElse(logDir, partitions) }
+    partitions
+  }
+
+  def markPartitionUncleanable(logDir: String, partition: TopicPartition): Unit = {
+    inLock(lock) {
+      uncleanablePartitions.get(logDir) match {
+        case Some(partitions) =>
+          partitions.add(partition)
+        case None =>
+          uncleanablePartitions.put(logDir, mutable.Set(partition))
+      }
+    }
+  }
+
+  private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
+    inLock(lock) {
+      uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition))
+    }
+  }
 }
 
 private[log] object LogCleanerManager extends Logging {
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index c78f04ecf82..897d3fc36f8 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -45,9 +45,8 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
    */
   def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
     error(msg, e)
-    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
+    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
       offlineLogDirQueue.add(logDir)
-    }
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 0ad5b46de2c..2a483fa95d9 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -24,10 +24,13 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, Pool, TestUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.utils.Utils
 import org.junit.After
 
+import scala.collection.Seq
 import scala.collection.mutable.ListBuffer
+import scala.util.Random
 
 abstract class AbstractLogCleanerIntegrationTest {
 
@@ -118,4 +121,31 @@ abstract class AbstractLogCleanerIntegrationTest {
       logDirFailureChannel = new LogDirFailureChannel(1),
       time = time)
   }
+
+  def codec: CompressionType
+  private var ctr = 0
+  def counter: Int = ctr
+  def incCounter(): Unit = ctr += 1
+
+  def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
+      val value = counter.toString
+      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
+        key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
+      incCounter()
+      (key, value, appendInfo.firstOffset.get)
+    }
+  }
+
+  def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
+    def messageValue(length: Int): String = {
+      val random = new Random(0)
+      new String(random.alphanumeric.take(length).toArray)
+    }
+    val value = messageValue(128)
+    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
+      magicValue = messageFormatVersion)
+    (value, messageSet)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
old mode 100755
new mode 100644
index 64e8b38cc95..bfee811167c
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -1,349 +1,96 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.log
 
-import java.io.File
-import java.util.Properties
+import java.io.PrintWriter
 
-import kafka.api.KAFKA_0_11_0_IV0
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
-import kafka.server.KafkaConfig
-import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.utils._
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
+import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record._
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
+import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
+import org.apache.kafka.common.record.{CompressionType, RecordBatch}
+import org.junit.Assert.{assertFalse, assertTrue, fail}
+import org.junit.Test
 
-import scala.Seq
-import scala.collection._
-import scala.util.Random
+import scala.collection.JavaConverters.mapAsScalaMapConverter
 
 /**
- * This is an integration test that tests the fully integrated log cleaner
- */
-@RunWith(value = classOf[Parameterized])
-class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest {
+  * This is an integration test that tests the fully integrated log cleaner
+  */
+class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
+
+  val codec: CompressionType = CompressionType.LZ4
 
-  val codec = CompressionType.forName(compressionCodec)
   val time = new MockTime()
-  var counter = 0
   val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
 
-  @Test
-  def cleanerTest() {
+  @Test(timeout = DEFAULT_MAX_WAIT_MS)
+  def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics() {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
     val maxMessageSize = largeMessageSet.sizeInBytes
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, backOffMs = 100)
 
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-
-    val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-    val largeMessageOffset = appendInfo.firstOffset.get
-
-    val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
-    val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-
-    // simulate deleting a partition, by removing it from logs
-    // force a checkpoint
-    // and make sure its gone from checkpoint file
-    cleaner.logs.remove(topicPartitions(0))
-    cleaner.updateCheckpoints(logDir)
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
-    // we expect partition 0 to be gone
-    assertFalse(checkpoints.contains(topicPartitions(0)))
-  }
-
-  @Test
-  def testCleansCombinedCompactAndDeleteTopic(): Unit = {
-    val logProps  = new Properties()
-    val retentionMs: Integer = 100000
-    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
-    logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
-
-    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
-      cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L)
-      val log = cleaner.logs.get(topicPartitions(0))
-
-      val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
-      val startSize = log.size
-
-      log.onHighWatermarkIncremented(log.logEndOffset)
-
-      val firstDirty = log.activeSegment.baseOffset
-      cleaner.startup()
-
-      // should compact the log
-      checkLastCleaned("log", 0, firstDirty)
-      val compactedSize = log.logSegments.map(_.size).sum
-      assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-      (log, messages)
-    }
-
-    val (log, _) = runCleanerAndCheckCompacted(100)
-    // should delete old segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
-
-    TestUtils.waitUntilTrue(() => log.numberOfSegments == 1, "There should only be 1 segment remaining", 10000L)
-    assertEquals(1, log.numberOfSegments)
+    def breakPartitionLog(tp: TopicPartition): Unit = {
+      val log = cleaner.logs.get(tp)
+      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
 
-    cleaner.shutdown()
+      val partitionFile = log.logSegments.last.log.file()
+      val writer = new PrintWriter(partitionFile)
+      writer.write("jogeajgoea")
+      writer.close()
 
-    // run the cleaner again to make sure if there are no issues post deletion
-    val (log2, messages) = runCleanerAndCheckCompacted(20)
-    val read = readFromLog(log2)
-    assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
-  }
-
-  // returns (value, ByteBufferMessageSet)
-  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
-    def messageValue(length: Int): String = {
-      val random = new Random(0)
-      new String(random.alphanumeric.take(length).toArray)
+      writeDups(numKeys = 20, numDups = 3, log = log, codec = codec)
     }
-    val value = messageValue(128)
-    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
-      magicValue = messageFormatVersion)
-    (value, messageSet)
-  }
 
-  @Test
-  def testCleanerWithMessageFormatV0(): Unit = {
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
-    val maxMessageSize = codec match {
-      case CompressionType.NONE => largeMessageSet.sizeInBytes
-      case _ =>
-        // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
-        // increase because the broker offsets are larger than the ones assigned by the client
-        // adding `5` to the message set size is good enough for this test: it covers the increased message size while
-        // still being less than the overhead introduced by the conversion from message format version 0 to 1
-        largeMessageSet.sizeInBytes + 5
+    def getGauge[T](metricName: String, metricScope: String): Gauge[T] = {
+      Metrics.defaultRegistry.allMetrics.asScala
+        .filterKeys(k => {
+          k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope)
+        })
+        .headOption
+        .getOrElse { fail(s"Unable to find metric $metricName") }
+        .asInstanceOf[(Any, Gauge[Any])]
+        ._2
+        .asInstanceOf[Gauge[T]]
     }
 
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-
-    val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
-    log.config = new LogConfig(props)
+    breakPartitionLog(topicPartitions(0))
+    breakPartitionLog(topicPartitions(1))
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    val startSize = log.size
     cleaner.startup()
 
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-
-    val appends2: Seq[(Int, String, Long)] = {
-      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-      val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
-      val largeMessageOffset = appendInfo.firstOffset.get
-
-      // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
-      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
-      log.config = new LogConfig(props)
-      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-      val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
-      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
-    }
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-  }
-
-  @Test
-  def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
-    val maxMessageSize = 192
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
-
     val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
-    log.config = new LogConfig(props)
-
-    // with compression enabled, these messages will be written as a single message containing
-    // all of the individual messages
-    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
-
-    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
-    log.config = new LogConfig(props)
-
-    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
-
-    val appends = appendsV0 ++ appendsV1
-
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1
-
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-  }
-
-  @Test
-  def cleanerConfigUpdateTest() {
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
-    val maxMessageSize = largeMessageSet.sizeInBytes
-
-    cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize,
-      cleanerIoBufferSize = Some(1))
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
-    val startSize = log.size
-    cleaner.startup()
-    assertEquals(1, cleaner.cleanerCount)
-
-    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
-    val firstDirty = log.activeSegment.baseOffset
-    val topicPartition = new TopicPartition("log", 0)
-    cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
-    assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
-
-    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
-      val props = TestUtils.createBrokerConfig(0, "localhost:2181")
-      props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
-      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
-      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
-      props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
-      props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString)
-      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
-      KafkaConfig.fromProps(props)
-    }
-
-    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
-    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
-    val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
-      dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
-      dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
-      ioBufferSize = 100000,
-      maxMessageSize = cleaner.currentConfig.maxMessageSize,
-      maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
-      backOffMs = cleaner.currentConfig.backOffMs))
-    cleaner.reconfigure(oldConfig, newConfig)
-
-    assertEquals(2, cleaner.cleanerCount)
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.map(_.size).sum
-    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
-  }
-
-  private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) {
-    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
-    // LogConfig.MinCleanableDirtyRatioProp
-    val topicPartition = new TopicPartition(topic, partitionId)
-    cleaner.awaitCleaned(topicPartition, firstDirty)
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
-    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
-      lastCleaned >= firstDirty)
-  }
-
-  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) {
-    val read = readFromLog(log)
-    assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read))
-    assertTrue(startSize > log.size)
-  }
-
-  private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
-    messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
-  }
-
-  private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
-    import JavaConverters._
-    for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
-      val key = TestUtils.readString(deepLogEntry.key).toInt
-      val value = TestUtils.readString(deepLogEntry.value)
-      (key, value, deepLogEntry.offset)
-    }
-  }
-
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
-    for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val value = counter.toString
-      val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
-              key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
-      counter += 1
-      (key, value, appendInfo.firstOffset.get)
-    }
-  }
-
-  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                                        startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
-    val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val payload = counter.toString
-      counter += 1
-      (key, payload)
-    }
-
-    val records = kvs.map { case (key, payload) =>
-      new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
-    }
-
-    val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
-    val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset
-
-    kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
-  }
-
-}
-
-object LogCleanerIntegrationTest {
-  @Parameters
-  def parameters: java.util.Collection[Array[String]] = {
-    val list = new java.util.ArrayList[Array[String]]()
-    for (codec <- CompressionType.values)
-      list.add(Array(codec.name))
-    list
+    val log2 = cleaner.logs.get(topicPartitions(1))
+    val uncleanableDirectory = log.dir.getParent
+    val uncleanablePartitionsCountGauge = getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory)
+    val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory)
+
+    TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L)
+    val expectedTotalUncleanableBytes = LogCleaner.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 +
+      LogCleaner.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2
+    TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes,
+      s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L)
+
+    val uncleanablePartitions = cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory)
+    assertTrue(uncleanablePartitions.contains(topicPartitions(0)))
+    assertTrue(uncleanablePartitions.contains(topicPartitions(1)))
+    assertFalse(uncleanablePartitions.contains(topicPartitions(2)))
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index bf634d731dd..6e8c9b9d336 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -41,9 +41,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
   val segmentSize = 512
-  var counter = 0
+
+  override def codec: CompressionType = CompressionType.forName(compressionCodecName)
+
   val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
-  val compressionCodec = CompressionType.forName(compressionCodecName)
 
   @Test
   def cleanerTest(): Unit = {
@@ -55,7 +56,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
 
     // t = T0
     val T0 = time.milliseconds
-    val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0)
+    val appends0 = writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T0)
     val startSizeBlock0 = log.size
     debug(s"total log size at T0: $startSizeBlock0")
 
@@ -78,7 +79,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
     val T1 = time.milliseconds
 
     // write another block of data
-    val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1)
+    val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, codec, timestamp = T1)
     val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
 
     // the first block should get cleaned
@@ -111,11 +112,10 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Abstrac
       val count = counter
       log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec,
               key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0)
-      counter += 1
+      incCounter()
       (key, count)
     }
   }
-
 }
 
 object LogCleanerLagIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 2a4869098e7..8ca26a86d16 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -29,6 +29,8 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import org.scalatest.junit.JUnitSuite
 
+import scala.collection.mutable
+
 /**
   * Unit tests for the log cleaning logic
   */
@@ -36,6 +38,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("log", 0)
   val logProps = new Properties()
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
@@ -43,11 +46,124 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   val logConfig = LogConfig(logProps)
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
 
+  val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
+
+  class LogCleanerManagerMock(logDirs: Seq[File],
+                              logs: Pool[TopicPartition, Log],
+                              logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
+    override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
+      cleanerCheckpoints.toMap
+    }
+  }
+
   @After
   def tearDown(): Unit = {
     Utils.delete(tmpDir)
   }
 
+  @Test
+  def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log2, filthiestLog.log)
+    assertEquals(tp2, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.markPartitionUncleanable(log2.dir.getParent, tp2)
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log3, filthiestLog.log)
+    assertEquals(tp3, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
+
+    val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
+
+    assertEquals(log3, filthiestLog.log)
+    assertEquals(tp3, filthiestLog.topicPartition)
+  }
+
+  @Test
+  def testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log1: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact, 1)
+    val log2: Log = createLog(records.sizeInBytes * 10, LogConfig.Compact, 2)
+    val log3: Log = createLog(records.sizeInBytes * 15, LogConfig.Compact, 3)
+
+    val logs = new Pool[TopicPartition, Log]()
+    val tp1 = new TopicPartition("wishing well", 0) // active segment starts at 0
+    logs.put(tp1, log1)
+    val tp2 = new TopicPartition("wishing well", 1) // active segment starts at 10
+    logs.put(tp2, log2)
+    val tp3 = new TopicPartition("wishing well", 2) // // active segment starts at 20
+    logs.put(tp3, log3)
+    val cleanerManager: LogCleanerManagerMock = createCleanerManager(logs, toMock = true).asInstanceOf[LogCleanerManagerMock]
+    cleanerCheckpoints.put(tp1, 0) // all clean
+    cleanerCheckpoints.put(tp2, 1) // dirtiest - 9 dirty messages
+    cleanerCheckpoints.put(tp3, 15) // 5 dirty messages
+    cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
+    cleanerManager.markPartitionUncleanable(log3.dir.getParent, tp3)
+
+    val filthiestLog: Option[LogToClean] = cleanerManager.grabFilthiestCompactedLog(time)
+
+    assertTrue(filthiestLog.isEmpty)
+  }
+
   /**
     * When checking for logs with segments ready for deletion
     * we shouldn't consider logs where cleanup.policy=delete
@@ -166,7 +282,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     */
   @Test
   def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
-    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val records = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
@@ -180,6 +296,21 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     assertEquals(None, cleanerManager.cleaningState(log.topicPartition))
   }
 
+  /**
+    * When looking for logs with segments ready to be deleted we shouldn't consider
+    * logs that have had their partition marked as uncleanable.
+    */
+  @Test
+  def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition)
+
+    val readyToDelete = cleanerManager.deletableLogs().size
+    assertEquals("should have 0 logs ready to be deleted", 0, readyToDelete)
+  }
+
   /**
     * Test computation of cleanable range with no minimum compaction lag settings active
     */
@@ -193,7 +324,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -224,7 +354,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     while (log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1), leaderEpoch = 0)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -250,7 +379,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     time.sleep(compactionLag + 1)
 
-    val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition, lastClean, time.milliseconds)
     assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
@@ -259,7 +387,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
   @Test
   def testUndecidedTransactionalDataNotCleanable(): Unit = {
-    val topicPartition = new TopicPartition("log", 0)
     val compactionLag = 60 * 60 * 1000
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@@ -315,21 +442,20 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
-    val tp = new TopicPartition("log", 0)
-    intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
+    intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
-    cleanerManager.setCleaningState(tp, LogCleaningPaused(1))
-    intercept[IllegalStateException](cleanerManager.doneCleaning(tp, log.dir, 1))
+    cleanerManager.setCleaningState(topicPartition, LogCleaningPaused(1))
+    intercept[IllegalStateException](cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
-    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
-    cleanerManager.doneCleaning(tp, log.dir, 1)
-    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+    cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
+    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
 
-    cleanerManager.setCleaningState(tp, LogCleaningAborted)
-    cleanerManager.doneCleaning(tp, log.dir, 1)
-    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+    cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
+    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
   }
 
   @Test
@@ -337,7 +463,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
     val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
     val tp = new TopicPartition("log", 0)
 
     intercept[IllegalStateException](cleanerManager.doneDeleting(Seq(tp)))
@@ -352,21 +477,27 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     cleanerManager.setCleaningState(tp, LogCleaningAborted)
     cleanerManager.doneDeleting(Seq(tp))
     assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(tp).get)
-
   }
 
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
-    logs.put(new TopicPartition("log", 0), log)
-    val cleanerManager = new LogCleanerManager(Array(logDir), logs, null)
-    cleanerManager
+    logs.put(topicPartition, log)
+    createCleanerManager(logs)
+  }
+
+  private def createCleanerManager(pool: Pool[TopicPartition, Log], toMock: Boolean = false): LogCleanerManager = {
+    if (toMock)
+      new LogCleanerManagerMock(Array(logDir), pool, null)
+    else
+      new LogCleanerManager(Array(logDir), pool, null)
   }
 
-  private def createLog(segmentSize: Int, cleanupPolicy: String): Log = {
+  private def createLog(segmentSize: Int, cleanupPolicy: String, segmentsCount: Int = 0): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
     logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.05: java.lang.Double) // small for easier and clearer tests
 
     val config = LogConfig(logProps)
     val partitionDir = new File(logDir, "log-0")
@@ -380,6 +511,22 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10))
+    for (i <- 0 until segmentsCount) {
+      val startOffset = i * 10
+      val endOffset = startOffset + 10
+      val segment = LogUtils.createSegment(startOffset, logDir)
+      var lastTimestamp = 0L
+      val records = (startOffset until endOffset).map { offset =>
+        val currentTimestamp = time.milliseconds()
+        if (offset == endOffset - 1)
+          lastTimestamp = currentTimestamp
+
+        new SimpleRecord(currentTimestamp, s"key-$offset".getBytes, s"value-$offset".getBytes)
+      }
+
+      segment.append(endOffset, lastTimestamp, endOffset, MemoryRecords.withRecords(CompressionType.NONE, records:_*))
+      log.addSegment(segment)
+    }
     log
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
new file mode 100755
index 00000000000..266bb391e2e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.api.KAFKA_0_11_0_IV0
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.KafkaConfig
+import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record._
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.Seq
+import scala.collection._
+
+/**
+ * This is an integration test that tests the fully integrated log cleaner
+ */
+@RunWith(value = classOf[Parameterized])
+class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest {
+
+  val codec: CompressionType = CompressionType.forName(compressionCodec)
+  val time = new MockTime()
+
+  val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
+
+
+  @Test
+  def cleanerTest() {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val maxMessageSize = largeMessageSet.sizeInBytes
+
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+
+    checkLogAfterAppendingDups(log, startSize, appends)
+
+    val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+    val largeMessageOffset = appendInfo.firstOffset.get
+
+    val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
+    val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
+    val firstDirty2 = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty2)
+
+    checkLogAfterAppendingDups(log, startSize, appends2)
+
+    // simulate deleting a partition, by removing it from logs
+    // force a checkpoint
+    // and make sure its gone from checkpoint file
+    cleaner.logs.remove(topicPartitions(0))
+    cleaner.updateCheckpoints(logDir)
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
+    // we expect partition 0 to be gone
+    assertFalse(checkpoints.contains(topicPartitions(0)))
+  }
+
+  @Test
+  def testCleansCombinedCompactAndDeleteTopic(): Unit = {
+    val logProps  = new Properties()
+    val retentionMs: Integer = 100000
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
+
+    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
+      cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L)
+      val log = cleaner.logs.get(topicPartitions(0))
+
+      val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
+      val startSize = log.size
+
+      log.onHighWatermarkIncremented(log.logEndOffset)
+
+      val firstDirty = log.activeSegment.baseOffset
+      cleaner.startup()
+
+      // should compact the log
+      checkLastCleaned("log", 0, firstDirty)
+      val compactedSize = log.logSegments.map(_.size).sum
+      assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+      (log, messages)
+    }
+
+    val (log, _) = runCleanerAndCheckCompacted(100)
+    // should delete old segments
+    log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
+
+    TestUtils.waitUntilTrue(() => log.numberOfSegments == 1, "There should only be 1 segment remaining", 10000L)
+    assertEquals(1, log.numberOfSegments)
+
+    cleaner.shutdown()
+
+    // run the cleaner again to make sure if there are no issues post deletion
+    val (log2, messages) = runCleanerAndCheckCompacted(20)
+    val read = readFromLog(log2)
+    assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
+  }
+
+  @Test
+  def testCleanerWithMessageFormatV0(): Unit = {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
+    val maxMessageSize = codec match {
+      case CompressionType.NONE => largeMessageSet.sizeInBytes
+      case _ =>
+        // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
+        // increase because the broker offsets are larger than the ones assigned by the client
+        // adding `5` to the message set size is good enough for this test: it covers the increased message size while
+        // still being less than the overhead introduced by the conversion from message format version 0 to 1
+        largeMessageSet.sizeInBytes + 5
+    }
+
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+
+    val log = cleaner.logs.get(topicPartitions(0))
+    val props = logConfigProperties(maxMessageSize = maxMessageSize)
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
+    log.config = new LogConfig(props)
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+
+    checkLogAfterAppendingDups(log, startSize, appends)
+
+    val appends2: Seq[(Int, String, Long)] = {
+      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+      val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
+      val largeMessageOffset = appendInfo.firstOffset.get
+
+      // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
+      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
+      log.config = new LogConfig(props)
+      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+      val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
+      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
+    }
+    val firstDirty2 = log.activeSegment.baseOffset
+    checkLastCleaned("log", 0, firstDirty2)
+
+    checkLogAfterAppendingDups(log, startSize, appends2)
+  }
+
+  @Test
+  def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
+    val maxMessageSize = 192
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+
+    val log = cleaner.logs.get(topicPartitions(0))
+    val props = logConfigProperties(maxMessageSize = maxMessageSize)
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
+    log.config = new LogConfig(props)
+
+    // with compression enabled, these messages will be written as a single message containing
+    // all of the individual messages
+    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+
+    props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
+    log.config = new LogConfig(props)
+
+    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+
+    val appends = appendsV0 ++ appendsV1
+
+    val startSize = log.size
+    cleaner.startup()
+
+    val firstDirty = log.activeSegment.baseOffset
+    assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1
+
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+
+    checkLogAfterAppendingDups(log, startSize, appends)
+  }
+
+  @Test
+  def cleanerConfigUpdateTest() {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val maxMessageSize = largeMessageSet.sizeInBytes
+
+    cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize,
+      cleanerIoBufferSize = Some(1))
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
+    val startSize = log.size
+    cleaner.startup()
+    assertEquals(1, cleaner.cleanerCount)
+
+    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+    val firstDirty = log.activeSegment.baseOffset
+    val topicPartition = new TopicPartition("log", 0)
+    cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
+    assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
+
+    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
+      val props = TestUtils.createBrokerConfig(0, "localhost:2181")
+      props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
+      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
+      props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
+      props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString)
+      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
+      KafkaConfig.fromProps(props)
+    }
+
+    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
+    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
+    val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
+      dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
+      dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
+      ioBufferSize = 100000,
+      maxMessageSize = cleaner.currentConfig.maxMessageSize,
+      maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
+      backOffMs = cleaner.currentConfig.backOffMs))
+    cleaner.reconfigure(oldConfig, newConfig)
+
+    assertEquals(2, cleaner.cleanerCount)
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
+  }
+
+  private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) {
+    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
+    // LogConfig.MinCleanableDirtyRatioProp
+    val topicPartition = new TopicPartition(topic, partitionId)
+    cleaner.awaitCleaned(topicPartition, firstDirty)
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
+      lastCleaned >= firstDirty)
+  }
+
+  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) {
+    val read = readFromLog(log)
+    assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read))
+    assertTrue(startSize > log.size)
+  }
+
+  private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
+    messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
+    import JavaConverters._
+    for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
+      val key = TestUtils.readString(deepLogEntry.key).toInt
+      val value = TestUtils.readString(deepLogEntry.value)
+      (key, value, deepLogEntry.offset)
+    }
+  }
+
+  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+                                        startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
+    val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
+      val payload = counter.toString
+      incCounter()
+      (key, payload)
+    }
+
+    val records = kvs.map { case (key, payload) =>
+      new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
+    }
+
+    val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
+    val offsets = appendInfo.firstOffset.get to appendInfo.lastOffset
+
+    kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
+  }
+
+}
+
+object LogCleanerParameterizedIntegrationTest {
+  @Parameters
+  def parameters: java.util.Collection[Array[String]] = {
+    val list = new java.util.ArrayList[Array[String]]()
+    for (codec <- CompressionType.values)
+      list.add(Array(codec.name))
+    list
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 8976c680419..40b687443a0 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -40,12 +40,7 @@ class LogSegmentTest {
                     indexIntervalBytes: Int = 10,
                     maxSegmentMs: Int = Int.MaxValue,
                     time: Time = Time.SYSTEM): LogSegment = {
-    val ms = FileRecords.open(Log.logFile(logDir, offset))
-    val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
-    val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
-    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
-    val seg = new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs = maxSegmentMs,
-      maxSegmentBytes = Int.MaxValue, time)
+    val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, maxSegmentMs, time)
     segments += seg
     seg
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala
new file mode 100644
index 00000000000..eb218952d0a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+
+import org.apache.kafka.common.record.FileRecords
+import org.apache.kafka.common.utils.Time
+
+object LogUtils {
+  /**
+    *  Create a segment with the given base offset
+    */
+  def createSegment(offset: Long,
+                    logDir: File,
+                    indexIntervalBytes: Int = 10,
+                    maxSegmentMs: Int = Int.MaxValue,
+                    time: Time = Time.SYSTEM): LogSegment = {
+    val ms = FileRecords.open(Log.logFile(logDir, offset))
+    val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
+    val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
+    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
+
+    new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, maxSegmentMs, Int.MaxValue, time)
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve LogCleaner behavior on error
> ------------------------------------
>
>                 Key: KAFKA-7215
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7215
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> For more detailed information see [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)