You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/28 10:31:47 UTC

kafka git commit: KAFKA-4326; Refactor LogCleaner for better reuse of common copy/compress logic

Repository: kafka
Updated Branches:
  refs/heads/trunk 34e9cc5df -> 0dd9607f9


KAFKA-4326; Refactor LogCleaner for better reuse of common copy/compress logic

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2053 from hachikuji/KAFKA-4326


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dd9607f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dd9607f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dd9607f

Branch: refs/heads/trunk
Commit: 0dd9607f9ca50a385e78af06b66e0e90c1f37076
Parents: 34e9cc5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Oct 28 06:31:29 2016 -0400
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 28 06:31:29 2016 -0400

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 139 +++------------
 .../kafka/message/ByteBufferMessageSet.scala    | 178 ++++++++++++++++---
 .../kafka/log/LogCleanerIntegrationTest.scala   |  50 +++---
 3 files changed, 210 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 34b0dbf..17824ec 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -448,133 +448,42 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              stats: CleanerStats) {
+    def shouldRetain(messageAndOffset: MessageAndOffset): Boolean =
+      shouldRetainMessage(source, map, retainDeletes, messageAndOffset, stats)
+
     var position = 0
     while (position < source.log.sizeInBytes) {
       checkDone(topicAndPartition)
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
-      var maxTimestamp = Message.NoTimestamp
-      var offsetOfMaxTimestamp = -1L
-      val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
+
+      source.log.readInto(readBuffer, position)
+      val messages = new ByteBufferMessageSet(readBuffer)
       throttler.maybeThrottle(messages.sizeInBytes)
-      // check each message to see if it is to be retained
-      var messagesRead = 0
-      for (shallowMessageAndOffset <- messages.shallowIterator) {
-        val shallowMessage = shallowMessageAndOffset.message
-        val shallowOffset = shallowMessageAndOffset.offset
-        val size = MessageSet.entrySize(shallowMessageAndOffset.message)
-
-        stats.readMessage(size)
-        if (shallowMessage.compressionCodec == NoCompressionCodec) {
-          if (shouldRetainMessage(source, map, retainDeletes, shallowMessageAndOffset, stats)) {
-            ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
-            stats.recopyMessage(size)
-            if (shallowMessage.timestamp > maxTimestamp) {
-              maxTimestamp = shallowMessage.timestamp
-              offsetOfMaxTimestamp = shallowOffset
-            }
-          }
-          messagesRead += 1
-        } else {
-          // We use the absolute offset to decide whether to retain the message or not (this is handled by the
-          // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
-          // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
-          // of the inner messages. This will be fixed as we recopy the messages to the destination segment.
-
-          var writeOriginalMessageSet = true
-          val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
-          val shallowMagic = shallowMessage.magic
-
-          for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
-            messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, deepMessageAndOffset, stats)) {
-              // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-              // the corrupted entry with correct data.
-              if (shallowMagic != deepMessageAndOffset.message.magic)
-                writeOriginalMessageSet = false
-
-              retainedMessages += deepMessageAndOffset
-              // We need the max timestamp and last offset for time index
-              if (deepMessageAndOffset.message.timestamp > maxTimestamp)
-                maxTimestamp = deepMessageAndOffset.message.timestamp
-            } else {
-              writeOriginalMessageSet = false
-            }
-          }
-          offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
-          // There are no messages compacted out and no message format conversion, write the original message set back
-          if (writeOriginalMessageSet)
-            ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
-          else if (retainedMessages.nonEmpty) {
-            val retainedSize = compressMessages(writeBuffer, shallowMessage.compressionCodec, retainedMessages)
-            stats.recopyMessage(retainedSize)
-          }
-        }
-      }
+      val result = messages.filterInto(writeBuffer, shouldRetain)
+
+      stats.readMessages(result.messagesRead, result.bytesRead)
+      stats.recopyMessages(result.messagesRetained, result.bytesRetained)
+
+      position += result.bytesRead
 
-      position += messages.validBytes
       // if any messages are to be retained, write them out
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = new ByteBufferMessageSet(writeBuffer)
-        dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp,
-          offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained)
+        dest.append(firstOffset = retained.head.offset, largestTimestamp = result.maxTimestamp,
+          offsetOfLargestTimestamp = result.offsetOfMaxTimestamp, messages = retained)
         throttler.maybeThrottle(writeBuffer.limit)
       }
       
       // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
-      if (readBuffer.limit > 0 && messagesRead == 0)
+      if (readBuffer.limit > 0 && result.messagesRead == 0)
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
   }
 
-  private def compressMessages(buffer: ByteBuffer,
-                               compressionCodec: CompressionCodec,
-                               messageAndOffsets: Seq[MessageAndOffset]): Int = {
-    require(compressionCodec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
-
-    if (messageAndOffsets.isEmpty) {
-      0
-    } else {
-      val messages = messageAndOffsets.map(_.message)
-      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
-
-      // ensure that we use the magic from the first message in the set when writing the wrapper
-      // message in order to fix message sets corrupted by KAFKA-4298
-      val magic = magicAndTimestamp.magic
-
-      val firstMessageOffset = messageAndOffsets.head
-      val firstAbsoluteOffset = firstMessageOffset.offset
-      var offset = -1L
-      val timestampType = firstMessageOffset.message.timestampType
-      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magic) { outputStream =>
-        val output = new DataOutputStream(CompressionFactory(compressionCodec, magic, outputStream))
-        try {
-          for (messageAndOffset <- messageAndOffsets) {
-            offset = messageAndOffset.offset
-            val innerOffset = if (magic > Message.MagicValue_V0)
-              // The offset of the messages are absolute offset, compute the inner offset.
-              messageAndOffset.offset - firstAbsoluteOffset
-            else
-              offset
-
-            val message = messageAndOffset.message
-            output.writeLong(innerOffset)
-            output.writeInt(message.size)
-            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
-          }
-        } finally {
-          output.close()
-        }
-      }
-      ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset)
-      messageWriter.size + MessageSet.LogOverhead
-    }
-  }
-
   private def shouldRetainMessage(source: kafka.log.LogSegment,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
@@ -709,8 +618,10 @@ private[log] class Cleaner(val id: Int,
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
       readBuffer.clear()
-      val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
+      segment.log.readInto(readBuffer, position)
+      val messages = new ByteBufferMessageSet(readBuffer)
       throttler.maybeThrottle(messages.sizeInBytes)
+
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
@@ -730,7 +641,7 @@ private[log] class Cleaner(val id: Int,
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()
-    return false
+    false
   }
 }
 
@@ -750,18 +661,18 @@ private class CleanerStats(time: Time = SystemTime) {
   var messagesWritten = 0L
   var bufferUtilization = 0.0d
 
-  def readMessage(size: Int) {
-    messagesRead += 1
-    bytesRead += size
+  def readMessages(messagesRead: Int, bytesRead: Int) {
+    this.messagesRead += messagesRead
+    this.bytesRead += bytesRead
   }
 
   def invalidMessage() {
     invalidMessagesRead += 1
   }
   
-  def recopyMessage(size: Int) {
-    messagesWritten += 1
-    bytesWritten += size
+  def recopyMessages(messagesWritten: Int, bytesWritten: Int) {
+    this.messagesWritten += messagesWritten
+    this.bytesWritten += bytesWritten
   }
 
   def indexMessagesRead(size: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 850b0e0..a33bc4b 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -24,11 +24,13 @@ import java.nio.channels._
 import java.io._
 import java.util.ArrayDeque
 
+import kafka.message.ByteBufferMessageSet.FilterResult
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 object ByteBufferMessageSet {
 
@@ -49,29 +51,10 @@ object ByteBufferMessageSet {
         case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
         case None => MessageSet.magicAndLargestTimestamp(messages)
       }
-      var offset = -1L
-      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
-        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
-        try {
-          for (message <- messages) {
-            offset = offsetAssigner.nextAbsoluteOffset()
-            if (message.magic != magicAndTimestamp.magic)
-              throw new IllegalArgumentException("Messages in the message set must have same magic value")
-            // Use inner offset if magic value is greater than 0
-            if (magicAndTimestamp.magic > Message.MagicValue_V0)
-              output.writeLong(offsetAssigner.toInnerOffset(offset))
-            else
-              output.writeLong(offset)
-            output.writeInt(message.size)
-            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
-          }
-        } finally {
-          output.close()
-        }
-      }
+      val (messageWriter, lastOffset) = writeCompressedMessages(compressionCodec, offsetAssigner, magicAndTimestamp,
+        timestampType, messages)
       val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
-      writeMessage(buffer, messageWriter, offset)
+      writeMessage(buffer, messageWriter, lastOffset)
       buffer.rewind()
       buffer
     }
@@ -165,6 +148,77 @@ object ByteBufferMessageSet {
     }
   }
 
+  private def writeCompressedMessages(codec: CompressionCodec,
+                                      offsetAssigner: OffsetAssigner,
+                                      magicAndTimestamp: MagicAndTimestamp,
+                                      timestampType: TimestampType,
+                                      messages: Seq[Message]): (MessageWriter, Long) = {
+    require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
+    require(messages.nonEmpty, "cannot write empty compressed message set")
+
+    var offset = -1L
+    val magic = magicAndTimestamp.magic
+    val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
+    messageWriter.write(
+      codec = codec,
+      timestamp = magicAndTimestamp.timestamp,
+      timestampType = timestampType,
+      magicValue = magic) { outputStream =>
+
+      val output = new DataOutputStream(CompressionFactory(codec, magic, outputStream))
+      try {
+        for (message <- messages) {
+          offset = offsetAssigner.nextAbsoluteOffset()
+
+          if (message.magic != magicAndTimestamp.magic)
+            throw new IllegalArgumentException("Messages in the message set must have same magic value")
+
+          // Use inner offset if magic value is greater than 0
+          val innerOffset = if (magicAndTimestamp.magic > Message.MagicValue_V0)
+            offsetAssigner.toInnerOffset(offset)
+          else
+            offset
+
+          output.writeLong(innerOffset)
+          output.writeInt(message.size)
+          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+        }
+      } finally {
+        output.close()
+      }
+    }
+
+    (messageWriter, offset)
+  }
+
+  private[kafka] def writeCompressedMessages(buffer: ByteBuffer,
+                                             codec: CompressionCodec,
+                                             messageAndOffsets: Seq[MessageAndOffset]): Int = {
+    require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
+
+    if (messageAndOffsets.isEmpty)
+      0
+    else {
+      val messages = messageAndOffsets.map(_.message)
+      val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
+
+      // ensure that we use the magic from the first message in the set when writing the wrapper
+      // message in order to fix message sets corrupted by KAFKA-4298
+      val magic = magicAndTimestamp.magic
+
+      val firstMessageAndOffset = messageAndOffsets.head
+      val firstAbsoluteOffset = firstMessageAndOffset.offset
+      val offsetAssigner = OffsetAssigner(firstAbsoluteOffset, magic, messageAndOffsets)
+      val timestampType = firstMessageAndOffset.message.timestampType
+
+      val (messageWriter, lastOffset) = writeCompressedMessages(codec, offsetAssigner, magicAndTimestamp,
+        timestampType, messages)
+
+      writeMessage(buffer, messageWriter, lastOffset)
+      messageWriter.size + MessageSet.LogOverhead
+    }
+  }
+
   private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
     buffer.putLong(offset)
     buffer.putInt(message.size)
@@ -177,6 +231,14 @@ object ByteBufferMessageSet {
     buffer.putInt(messageWriter.size)
     messageWriter.writeTo(buffer)
   }
+
+
+  case class FilterResult(messagesRead: Int,
+                          bytesRead: Int,
+                          messagesRetained: Int,
+                          bytesRetained: Int,
+                          maxTimestamp: Long,
+                          offsetOfMaxTimestamp: Long)
 }
 
 private object OffsetAssigner {
@@ -184,6 +246,9 @@ private object OffsetAssigner {
   def apply(offsetCounter: LongRef, size: Int): OffsetAssigner =
     new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size))
 
+  def apply(baseOffset: Long, magic: Byte, messageAndOffsets: Seq[MessageAndOffset]): OffsetAssigner =
+    new OffsetAssigner(messageAndOffsets.map(_.offset))
+
 }
 
 private class OffsetAssigner(offsets: Seq[Long]) {
@@ -389,6 +454,75 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
     }
   }
 
+  def filterInto(buffer: ByteBuffer,
+                 filter: MessageAndOffset => Boolean): FilterResult = {
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
+    var messagesRead = 0
+    var bytesRead = 0
+    var messagesRetained = 0
+    var bytesRetained = 0
+
+    for (shallowMessageAndOffset <- shallowIterator) {
+      val shallowMessage = shallowMessageAndOffset.message
+      val shallowOffset = shallowMessageAndOffset.offset
+      val size = MessageSet.entrySize(shallowMessageAndOffset.message)
+
+      messagesRead += 1
+      bytesRead += size
+
+      if (shallowMessageAndOffset.message.compressionCodec == NoCompressionCodec) {
+        if (filter(shallowMessageAndOffset)) {
+          ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
+          messagesRetained += 1
+          bytesRetained += size
+
+          if (shallowMessage.timestamp > maxTimestamp) {
+            maxTimestamp = shallowMessage.timestamp
+            offsetOfMaxTimestamp = shallowOffset
+          }
+        }
+        messagesRead += 1
+      } else {
+        // We use the absolute offset to decide whether to retain the message or not (this is handled by the
+        // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
+        // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
+        // of the inner messages. This will be fixed as we recopy the messages to the destination segment.
+
+        var writeOriginalMessageSet = true
+        val retainedMessages = ArrayBuffer[MessageAndOffset]()
+        val shallowMagic = shallowMessage.magic
+
+        for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
+          messagesRead += 1
+          if (filter(deepMessageAndOffset)) {
+            // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+            // the corrupted entry with correct data.
+            if (shallowMagic != deepMessageAndOffset.message.magic)
+              writeOriginalMessageSet = false
+
+            retainedMessages += deepMessageAndOffset
+            // We need the max timestamp and last offset for time index
+            if (deepMessageAndOffset.message.timestamp > maxTimestamp)
+              maxTimestamp = deepMessageAndOffset.message.timestamp
+          }
+          else writeOriginalMessageSet = false
+        }
+        offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
+        // There are no messages compacted out and no message format conversion, write the original message set back
+        if (writeOriginalMessageSet)
+          ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
+        else if (retainedMessages.nonEmpty) {
+          val compressedSize = ByteBufferMessageSet.writeCompressedMessages(buffer, shallowMessage.compressionCodec, retainedMessages)
+          messagesRetained += 1
+          bytesRetained += compressedSize
+        }
+      }
+    }
+
+    FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, offsetOfMaxTimestamp)
+  }
+
   /**
    * Update the offsets for this message set and do further validation on messages including:
    * 1. Messages for compacted topics must have keys

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 40030cb..250c8b8 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -73,9 +73,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     checkLogAfterAppendingDups(log, startSize, appends)
 
-    log.append(largeMessageSet, assignOffsets = true)
+    val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+    val largeMessageOffset = appendInfo.firstOffset
+
     val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
-    val appends2 = appends ++ Seq(largeMessageKey -> largeMessageValue) ++ dups
+    val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
     val firstDirty2 = log.activeSegment.baseOffset
     checkLastCleaned("log", 0, firstDirty2)
 
@@ -98,11 +100,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
     logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
 
-    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String)]) = {
+    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
       cleaner = makeCleaner(parts = 1, propertyOverrides = logProps, logCleanerBackOffMillis = 100L)
       val log = cleaner.logs.get(topics(0))
 
-      val messages: Seq[(Int, String)] = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
+      val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
       val startSize = log.size
 
       val firstDirty = log.activeSegment.baseOffset
@@ -115,7 +117,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       (log, messages)
     }
 
-
     val (log, _) = runCleanerAndCheckCompacted(100)
     // should delete old segments
     log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
@@ -128,10 +129,10 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     // run the cleaner again to make sure if there are no issues post deletion
     val (log2, messages) = runCleanerAndCheckCompacted(20)
     val read = readFromLog(log2)
-    assertEquals("Contents of the map shouldn't change", messages.toMap, read.toMap)
+    assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
   }
 
-  // returns (value, ByteBufferMessag eSet)
+  // returns (value, ByteBufferMessageSet)
   private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = {
     def messageValue(length: Int): String = {
       val random = new Random(0)
@@ -175,15 +176,16 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     checkLogAfterAppendingDups(log, startSize, appends)
 
-    val appends2: Seq[(Int, String)] = {
+    val appends2: Seq[(Int, String, Long)] = {
       val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
-      log.append(largeMessageSet, assignOffsets = true)
+      val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+      val largeMessageOffset = appendInfo.firstOffset
 
       // also add some messages with version 1 to check that we handle mixed format versions correctly
       props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
       log.config = new LogConfig(props)
       val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V1)
-      appends ++ dupsV0 ++ Seq(largeMessageKey -> largeMessageValue) ++ dupsV1
+      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
     }
     val firstDirty2 = log.activeSegment.baseOffset
     checkLastCleaned("log", 0, firstDirty2)
@@ -232,18 +234,22 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
     // LogConfig.MinCleanableDirtyRatioProp
     cleaner.awaitCleaned(topic, partitionId, firstDirty)
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic, partitionId)).get
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition(topic, partitionId))
     assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
       lastCleaned >= firstDirty)
   }
 
-  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String)]) {
+  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) {
     val read = readFromLog(log)
-    assertEquals("Contents of the map shouldn't change", appends.toMap, read.toMap)
+    assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read))
     assertTrue(startSize > log.size)
   }
 
-  private def readFromLog(log: Log): Iterable[(Int, String)] = {
+  private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
+    messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
 
     def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] =
       // create single message iterator or deep iterator depending on compression codec
@@ -253,23 +259,23 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry)) yield {
       val key = TestUtils.readString(messageAndOffset.message.key).toInt
       val value = TestUtils.readString(messageAndOffset.message.payload)
-      key -> value
+      (key, value, messageAndOffset.offset)
     }
   }
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
-                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String)] = {
+                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
-      log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
+      val appendInfo = log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
         key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
       counter += 1
-      (key, payload)
+      (key, payload, appendInfo.firstOffset)
     }
   }
 
   private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
-                                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String)] = {
+                                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
       counter += 1
@@ -281,8 +287,10 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     }
 
     val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*)
-    log.append(messageSet, assignOffsets = true)
-    kvs
+    val appendInfo = log.append(messageSet, assignOffsets = true)
+    val offsets = appendInfo.firstOffset to appendInfo.lastOffset
+
+    kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
   }
 
   @After