You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/28 10:31:47 UTC
kafka git commit: KAFKA-4326;
Refactor LogCleaner for better reuse of common copy/compress logic
Repository: kafka
Updated Branches:
refs/heads/trunk 34e9cc5df -> 0dd9607f9
KAFKA-4326; Refactor LogCleaner for better reuse of common copy/compress logic
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2053 from hachikuji/KAFKA-4326
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0dd9607f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0dd9607f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0dd9607f
Branch: refs/heads/trunk
Commit: 0dd9607f9ca50a385e78af06b66e0e90c1f37076
Parents: 34e9cc5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Oct 28 06:31:29 2016 -0400
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 28 06:31:29 2016 -0400
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 139 +++------------
.../kafka/message/ByteBufferMessageSet.scala | 178 ++++++++++++++++---
.../kafka/log/LogCleanerIntegrationTest.scala | 50 +++---
3 files changed, 210 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 34b0dbf..17824ec 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -448,133 +448,42 @@ private[log] class Cleaner(val id: Int,
retainDeletes: Boolean,
maxLogMessageSize: Int,
stats: CleanerStats) {
+ def shouldRetain(messageAndOffset: MessageAndOffset): Boolean =
+ shouldRetainMessage(source, map, retainDeletes, messageAndOffset, stats)
+
var position = 0
while (position < source.log.sizeInBytes) {
checkDone(topicAndPartition)
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()
- var maxTimestamp = Message.NoTimestamp
- var offsetOfMaxTimestamp = -1L
- val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
+
+ source.log.readInto(readBuffer, position)
+ val messages = new ByteBufferMessageSet(readBuffer)
throttler.maybeThrottle(messages.sizeInBytes)
- // check each message to see if it is to be retained
- var messagesRead = 0
- for (shallowMessageAndOffset <- messages.shallowIterator) {
- val shallowMessage = shallowMessageAndOffset.message
- val shallowOffset = shallowMessageAndOffset.offset
- val size = MessageSet.entrySize(shallowMessageAndOffset.message)
-
- stats.readMessage(size)
- if (shallowMessage.compressionCodec == NoCompressionCodec) {
- if (shouldRetainMessage(source, map, retainDeletes, shallowMessageAndOffset, stats)) {
- ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
- stats.recopyMessage(size)
- if (shallowMessage.timestamp > maxTimestamp) {
- maxTimestamp = shallowMessage.timestamp
- offsetOfMaxTimestamp = shallowOffset
- }
- }
- messagesRead += 1
- } else {
- // We use the absolute offset to decide whether to retain the message or not (this is handled by the
- // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
- // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
- // of the inner messages. This will be fixed as we recopy the messages to the destination segment.
-
- var writeOriginalMessageSet = true
- val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
- val shallowMagic = shallowMessage.magic
-
- for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
- messagesRead += 1
- if (shouldRetainMessage(source, map, retainDeletes, deepMessageAndOffset, stats)) {
- // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
- // the corrupted entry with correct data.
- if (shallowMagic != deepMessageAndOffset.message.magic)
- writeOriginalMessageSet = false
-
- retainedMessages += deepMessageAndOffset
- // We need the max timestamp and last offset for time index
- if (deepMessageAndOffset.message.timestamp > maxTimestamp)
- maxTimestamp = deepMessageAndOffset.message.timestamp
- } else {
- writeOriginalMessageSet = false
- }
- }
- offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
- // There are no messages compacted out and no message format conversion, write the original message set back
- if (writeOriginalMessageSet)
- ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
- else if (retainedMessages.nonEmpty) {
- val retainedSize = compressMessages(writeBuffer, shallowMessage.compressionCodec, retainedMessages)
- stats.recopyMessage(retainedSize)
- }
- }
- }
+ val result = messages.filterInto(writeBuffer, shouldRetain)
+
+ stats.readMessages(result.messagesRead, result.bytesRead)
+ stats.recopyMessages(result.messagesRetained, result.bytesRetained)
+
+ position += result.bytesRead
- position += messages.validBytes
// if any messages are to be retained, write them out
if (writeBuffer.position > 0) {
writeBuffer.flip()
val retained = new ByteBufferMessageSet(writeBuffer)
- dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp,
- offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained)
+ dest.append(firstOffset = retained.head.offset, largestTimestamp = result.maxTimestamp,
+ offsetOfLargestTimestamp = result.offsetOfMaxTimestamp, messages = retained)
throttler.maybeThrottle(writeBuffer.limit)
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
- if (readBuffer.limit > 0 && messagesRead == 0)
+ if (readBuffer.limit > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()
}
- private def compressMessages(buffer: ByteBuffer,
- compressionCodec: CompressionCodec,
- messageAndOffsets: Seq[MessageAndOffset]): Int = {
- require(compressionCodec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
-
- if (messageAndOffsets.isEmpty) {
- 0
- } else {
- val messages = messageAndOffsets.map(_.message)
- val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
-
- // ensure that we use the magic from the first message in the set when writing the wrapper
- // message in order to fix message sets corrupted by KAFKA-4298
- val magic = magicAndTimestamp.magic
-
- val firstMessageOffset = messageAndOffsets.head
- val firstAbsoluteOffset = firstMessageOffset.offset
- var offset = -1L
- val timestampType = firstMessageOffset.message.timestampType
- val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
- messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magic) { outputStream =>
- val output = new DataOutputStream(CompressionFactory(compressionCodec, magic, outputStream))
- try {
- for (messageAndOffset <- messageAndOffsets) {
- offset = messageAndOffset.offset
- val innerOffset = if (magic > Message.MagicValue_V0)
- // The offset of the messages are absolute offset, compute the inner offset.
- messageAndOffset.offset - firstAbsoluteOffset
- else
- offset
-
- val message = messageAndOffset.message
- output.writeLong(innerOffset)
- output.writeInt(message.size)
- output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
- }
- } finally {
- output.close()
- }
- }
- ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset)
- messageWriter.size + MessageSet.LogOverhead
- }
- }
-
private def shouldRetainMessage(source: kafka.log.LogSegment,
map: kafka.log.OffsetMap,
retainDeletes: Boolean,
@@ -709,8 +618,10 @@ private[log] class Cleaner(val id: Int,
while (position < segment.log.sizeInBytes) {
checkDone(topicAndPartition)
readBuffer.clear()
- val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
+ segment.log.readInto(readBuffer, position)
+ val messages = new ByteBufferMessageSet(readBuffer)
throttler.maybeThrottle(messages.sizeInBytes)
+
val startPosition = position
for (entry <- messages) {
val message = entry.message
@@ -730,7 +641,7 @@ private[log] class Cleaner(val id: Int,
growBuffers(maxLogMessageSize)
}
restoreBuffers()
- return false
+ false
}
}
@@ -750,18 +661,18 @@ private class CleanerStats(time: Time = SystemTime) {
var messagesWritten = 0L
var bufferUtilization = 0.0d
- def readMessage(size: Int) {
- messagesRead += 1
- bytesRead += size
+ def readMessages(messagesRead: Int, bytesRead: Int) {
+ this.messagesRead += messagesRead
+ this.bytesRead += bytesRead
}
def invalidMessage() {
invalidMessagesRead += 1
}
- def recopyMessage(size: Int) {
- messagesWritten += 1
- bytesWritten += size
+ def recopyMessages(messagesWritten: Int, bytesWritten: Int) {
+ this.messagesWritten += messagesWritten
+ this.bytesWritten += bytesWritten
}
def indexMessagesRead(size: Int) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 850b0e0..a33bc4b 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -24,11 +24,13 @@ import java.nio.channels._
import java.io._
import java.util.ArrayDeque
+import kafka.message.ByteBufferMessageSet.FilterResult
import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
object ByteBufferMessageSet {
@@ -49,29 +51,10 @@ object ByteBufferMessageSet {
case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
case None => MessageSet.magicAndLargestTimestamp(messages)
}
- var offset = -1L
- val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
- messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
- val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
- try {
- for (message <- messages) {
- offset = offsetAssigner.nextAbsoluteOffset()
- if (message.magic != magicAndTimestamp.magic)
- throw new IllegalArgumentException("Messages in the message set must have same magic value")
- // Use inner offset if magic value is greater than 0
- if (magicAndTimestamp.magic > Message.MagicValue_V0)
- output.writeLong(offsetAssigner.toInnerOffset(offset))
- else
- output.writeLong(offset)
- output.writeInt(message.size)
- output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
- }
- } finally {
- output.close()
- }
- }
+ val (messageWriter, lastOffset) = writeCompressedMessages(compressionCodec, offsetAssigner, magicAndTimestamp,
+ timestampType, messages)
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
- writeMessage(buffer, messageWriter, offset)
+ writeMessage(buffer, messageWriter, lastOffset)
buffer.rewind()
buffer
}
@@ -165,6 +148,77 @@ object ByteBufferMessageSet {
}
}
+ private def writeCompressedMessages(codec: CompressionCodec,
+ offsetAssigner: OffsetAssigner,
+ magicAndTimestamp: MagicAndTimestamp,
+ timestampType: TimestampType,
+ messages: Seq[Message]): (MessageWriter, Long) = {
+ require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
+ require(messages.nonEmpty, "cannot write empty compressed message set")
+
+ var offset = -1L
+ val magic = magicAndTimestamp.magic
+ val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
+ messageWriter.write(
+ codec = codec,
+ timestamp = magicAndTimestamp.timestamp,
+ timestampType = timestampType,
+ magicValue = magic) { outputStream =>
+
+ val output = new DataOutputStream(CompressionFactory(codec, magic, outputStream))
+ try {
+ for (message <- messages) {
+ offset = offsetAssigner.nextAbsoluteOffset()
+
+ if (message.magic != magicAndTimestamp.magic)
+ throw new IllegalArgumentException("Messages in the message set must have same magic value")
+
+ // Use inner offset if magic value is greater than 0
+ val innerOffset = if (magicAndTimestamp.magic > Message.MagicValue_V0)
+ offsetAssigner.toInnerOffset(offset)
+ else
+ offset
+
+ output.writeLong(innerOffset)
+ output.writeInt(message.size)
+ output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+ }
+ } finally {
+ output.close()
+ }
+ }
+
+ (messageWriter, offset)
+ }
+
+ private[kafka] def writeCompressedMessages(buffer: ByteBuffer,
+ codec: CompressionCodec,
+ messageAndOffsets: Seq[MessageAndOffset]): Int = {
+ require(codec != NoCompressionCodec, s"compressionCodec must not be $NoCompressionCodec")
+
+ if (messageAndOffsets.isEmpty)
+ 0
+ else {
+ val messages = messageAndOffsets.map(_.message)
+ val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
+
+ // ensure that we use the magic from the first message in the set when writing the wrapper
+ // message in order to fix message sets corrupted by KAFKA-4298
+ val magic = magicAndTimestamp.magic
+
+ val firstMessageAndOffset = messageAndOffsets.head
+ val firstAbsoluteOffset = firstMessageAndOffset.offset
+ val offsetAssigner = OffsetAssigner(firstAbsoluteOffset, magic, messageAndOffsets)
+ val timestampType = firstMessageAndOffset.message.timestampType
+
+ val (messageWriter, lastOffset) = writeCompressedMessages(codec, offsetAssigner, magicAndTimestamp,
+ timestampType, messages)
+
+ writeMessage(buffer, messageWriter, lastOffset)
+ messageWriter.size + MessageSet.LogOverhead
+ }
+ }
+
private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
buffer.putLong(offset)
buffer.putInt(message.size)
@@ -177,6 +231,14 @@ object ByteBufferMessageSet {
buffer.putInt(messageWriter.size)
messageWriter.writeTo(buffer)
}
+
+
+ case class FilterResult(messagesRead: Int,
+ bytesRead: Int,
+ messagesRetained: Int,
+ bytesRetained: Int,
+ maxTimestamp: Long,
+ offsetOfMaxTimestamp: Long)
}
private object OffsetAssigner {
@@ -184,6 +246,9 @@ private object OffsetAssigner {
def apply(offsetCounter: LongRef, size: Int): OffsetAssigner =
new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size))
+ def apply(baseOffset: Long, magic: Byte, messageAndOffsets: Seq[MessageAndOffset]): OffsetAssigner =
+ new OffsetAssigner(messageAndOffsets.map(_.offset))
+
}
private class OffsetAssigner(offsets: Seq[Long]) {
@@ -389,6 +454,75 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
}
}
+ def filterInto(buffer: ByteBuffer,
+ filter: MessageAndOffset => Boolean): FilterResult = {
+ var maxTimestamp = Message.NoTimestamp
+ var offsetOfMaxTimestamp = -1L
+ var messagesRead = 0
+ var bytesRead = 0
+ var messagesRetained = 0
+ var bytesRetained = 0
+
+ for (shallowMessageAndOffset <- shallowIterator) {
+ val shallowMessage = shallowMessageAndOffset.message
+ val shallowOffset = shallowMessageAndOffset.offset
+ val size = MessageSet.entrySize(shallowMessageAndOffset.message)
+
+ messagesRead += 1
+ bytesRead += size
+
+ if (shallowMessageAndOffset.message.compressionCodec == NoCompressionCodec) {
+ if (filter(shallowMessageAndOffset)) {
+ ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
+ messagesRetained += 1
+ bytesRetained += size
+
+ if (shallowMessage.timestamp > maxTimestamp) {
+ maxTimestamp = shallowMessage.timestamp
+ offsetOfMaxTimestamp = shallowOffset
+ }
+ }
+ messagesRead += 1
+ } else {
+ // We use the absolute offset to decide whether to retain the message or not (this is handled by the
+ // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
+ // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
+ // of the inner messages. This will be fixed as we recopy the messages to the destination segment.
+
+ var writeOriginalMessageSet = true
+ val retainedMessages = ArrayBuffer[MessageAndOffset]()
+ val shallowMagic = shallowMessage.magic
+
+ for (deepMessageAndOffset <- ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
+ messagesRead += 1
+ if (filter(deepMessageAndOffset)) {
+ // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+ // the corrupted entry with correct data.
+ if (shallowMagic != deepMessageAndOffset.message.magic)
+ writeOriginalMessageSet = false
+
+ retainedMessages += deepMessageAndOffset
+ // We need the max timestamp and last offset for time index
+ if (deepMessageAndOffset.message.timestamp > maxTimestamp)
+ maxTimestamp = deepMessageAndOffset.message.timestamp
+ }
+ else writeOriginalMessageSet = false
+ }
+ offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
+ // There are no messages compacted out and no message format conversion, write the original message set back
+ if (writeOriginalMessageSet)
+ ByteBufferMessageSet.writeMessage(buffer, shallowMessage, shallowOffset)
+ else if (retainedMessages.nonEmpty) {
+ val compressedSize = ByteBufferMessageSet.writeCompressedMessages(buffer, shallowMessage.compressionCodec, retainedMessages)
+ messagesRetained += 1
+ bytesRetained += compressedSize
+ }
+ }
+ }
+
+ FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained, maxTimestamp, offsetOfMaxTimestamp)
+ }
+
/**
* Update the offsets for this message set and do further validation on messages including:
* 1. Messages for compacted topics must have keys
http://git-wip-us.apache.org/repos/asf/kafka/blob/0dd9607f/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 40030cb..250c8b8 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -73,9 +73,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
checkLogAfterAppendingDups(log, startSize, appends)
- log.append(largeMessageSet, assignOffsets = true)
+ val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+ val largeMessageOffset = appendInfo.firstOffset
+
val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
- val appends2 = appends ++ Seq(largeMessageKey -> largeMessageValue) ++ dups
+ val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
@@ -98,11 +100,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
- def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String)]) = {
+ def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
cleaner = makeCleaner(parts = 1, propertyOverrides = logProps, logCleanerBackOffMillis = 100L)
val log = cleaner.logs.get(topics(0))
- val messages: Seq[(Int, String)] = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
+ val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
val startSize = log.size
val firstDirty = log.activeSegment.baseOffset
@@ -115,7 +117,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
(log, messages)
}
-
val (log, _) = runCleanerAndCheckCompacted(100)
// should delete old segments
log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
@@ -128,10 +129,10 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
// run the cleaner again to make sure if there are no issues post deletion
val (log2, messages) = runCleanerAndCheckCompacted(20)
val read = readFromLog(log2)
- assertEquals("Contents of the map shouldn't change", messages.toMap, read.toMap)
+ assertEquals("Contents of the map shouldn't change", toMap(messages), toMap(read))
}
- // returns (value, ByteBufferMessag eSet)
+ // returns (value, ByteBufferMessageSet)
private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = {
def messageValue(length: Int): String = {
val random = new Random(0)
@@ -175,15 +176,16 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
checkLogAfterAppendingDups(log, startSize, appends)
- val appends2: Seq[(Int, String)] = {
+ val appends2: Seq[(Int, String, Long)] = {
val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
- log.append(largeMessageSet, assignOffsets = true)
+ val appendInfo = log.append(largeMessageSet, assignOffsets = true)
+ val largeMessageOffset = appendInfo.firstOffset
// also add some messages with version 1 to check that we handle mixed format versions correctly
props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
log.config = new LogConfig(props)
val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V1)
- appends ++ dupsV0 ++ Seq(largeMessageKey -> largeMessageValue) ++ dupsV1
+ appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
}
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
@@ -232,18 +234,22 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
// wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
// LogConfig.MinCleanableDirtyRatioProp
cleaner.awaitCleaned(topic, partitionId, firstDirty)
- val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic, partitionId)).get
+ val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition(topic, partitionId))
assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
lastCleaned >= firstDirty)
}
- private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String)]) {
+ private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]) {
val read = readFromLog(log)
- assertEquals("Contents of the map shouldn't change", appends.toMap, read.toMap)
+ assertEquals("Contents of the map shouldn't change", toMap(appends), toMap(read))
assertTrue(startSize > log.size)
}
- private def readFromLog(log: Log): Iterable[(Int, String)] = {
+ private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
+ messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
+ }
+
+ private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] =
// create single message iterator or deep iterator depending on compression codec
@@ -253,23 +259,23 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry)) yield {
val key = TestUtils.readString(messageAndOffset.message.key).toInt
val value = TestUtils.readString(messageAndOffset.message.payload)
- key -> value
+ (key, value, messageAndOffset.offset)
}
}
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
- startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String)] = {
+ startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
- log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
+ val appendInfo = log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
counter += 1
- (key, payload)
+ (key, payload, appendInfo.firstOffset)
}
}
private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
- startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String)] = {
+ startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
counter += 1
@@ -281,8 +287,10 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*)
- log.append(messageSet, assignOffsets = true)
- kvs
+ val appendInfo = log.append(messageSet, assignOffsets = true)
+ val offsets = appendInfo.firstOffset to appendInfo.lastOffset
+
+ kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
}
@After