You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/20 19:28:07 UTC
kafka git commit: KAFKA-1374;
Log cleaner should be able to handle compressed messages;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 29419581f -> bb133c63b
KAFKA-1374; Log cleaner should be able to handle compressed messages; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb133c63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb133c63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb133c63
Branch: refs/heads/trunk
Commit: bb133c63b0645c22f2d9b76393886fb506b14a93
Parents: 2941958
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed May 20 10:27:39 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 20 10:27:39 2015 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 122 +++++++++++++------
.../kafka/message/ByteBufferMessageSet.scala | 10 +-
.../scala/kafka/tools/TestLogCleaning.scala | 36 +++---
.../kafka/log/LogCleanerIntegrationTest.scala | 63 +++++++---
.../src/test/scala/unit/kafka/log/LogTest.scala | 25 +---
5 files changed, 158 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index abea8b2..c9ade72 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,21 +17,18 @@
package kafka.log
+import java.io.{DataOutputStream, File}
+import java.nio._
+import java.util.Date
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
import kafka.common._
import kafka.message._
-import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
+import kafka.utils._
import scala.collection._
-import scala.math
-import java.nio._
-import java.util.Date
-import java.io.File
-import java.lang.IllegalStateException
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import com.yammer.metrics.core.Gauge
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -389,7 +386,6 @@ private[log] class Cleaner(val id: Int,
* @param map The key=>offset mapping
* @param retainDeletes Should delete tombstones be retained while cleaning this segment
*
- * TODO: Implement proper compression support
*/
private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
@@ -403,30 +399,30 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(messages.sizeInBytes)
// check each message to see if it is to be retained
var messagesRead = 0
- for (entry <- messages) {
- messagesRead += 1
+ for (entry <- messages.shallowIterator) {
val size = MessageSet.entrySize(entry.message)
- position += size
stats.readMessage(size)
- val key = entry.message.key
- if (key != null) {
- val foundOffset = map.get(key)
- /* two cases in which we can get rid of a message:
- * 1) if there exists a message with the same key but higher offset
- * 2) if the message is a delete "tombstone" marker and enough time has passed
- */
- val redundant = foundOffset >= 0 && entry.offset < foundOffset
- val obsoleteDelete = !retainDeletes && entry.message.isNull
- if (!redundant && !obsoleteDelete) {
+ if (entry.message.compressionCodec == NoCompressionCodec) {
+ if (shouldRetainMessage(source, map, retainDeletes, entry)) {
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
stats.recopyMessage(size)
}
+ messagesRead += 1
} else {
- stats.invalidMessage()
+ val messages = ByteBufferMessageSet.deepIterator(entry.message)
+ val retainedMessages = messages.filter(messageAndOffset => {
+ messagesRead += 1
+ shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
+ }).toSeq
+
+ if (retainedMessages.nonEmpty)
+ compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages)
}
}
+
+ position += messages.validBytes
// if any messages are to be retained, write them out
- if(writeBuffer.position > 0) {
+ if (writeBuffer.position > 0) {
writeBuffer.flip()
val retained = new ByteBufferMessageSet(writeBuffer)
dest.append(retained.head.offset, retained)
@@ -434,12 +430,62 @@ private[log] class Cleaner(val id: Int,
}
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
- if(readBuffer.limit > 0 && messagesRead == 0)
+ if (readBuffer.limit > 0 && messagesRead == 0)
growBuffers()
}
restoreBuffers()
}
-
+
+ private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]) {
+ val messagesIterable = messages.toIterable.map(_.message)
+ if (messages.isEmpty) {
+ MessageSet.Empty.sizeInBytes
+ } else if (compressionCodec == NoCompressionCodec) {
+ for(messageOffset <- messages)
+ ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
+ MessageSet.messageSetSize(messagesIterable)
+ } else {
+ var offset = -1L
+ val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16))
+ messageWriter.write(codec = compressionCodec) { outputStream =>
+ val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
+ try {
+ for (messageOffset <- messages) {
+ val message = messageOffset.message
+ offset = messageOffset.offset
+ output.writeLong(offset)
+ output.writeInt(message.size)
+ output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+ }
+ } finally {
+ output.close()
+ }
+ }
+ ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset)
+ stats.recopyMessage(messageWriter.size + MessageSet.LogOverhead)
+ }
+ }
+
+ private def shouldRetainMessage(source: kafka.log.LogSegment,
+ map: kafka.log.OffsetMap,
+ retainDeletes: Boolean,
+ entry: kafka.message.MessageAndOffset): Boolean = {
+ val key = entry.message.key
+ if (key != null) {
+ val foundOffset = map.get(key)
+ /* two cases in which we can get rid of a message:
+ * 1) if there exists a message with the same key but higher offset
+ * 2) if the message is a delete "tombstone" marker and enough time has passed
+ */
+ val redundant = foundOffset >= 0 && entry.offset < foundOffset
+ val obsoleteDelete = !retainDeletes && entry.message.isNull
+ !redundant && !obsoleteDelete
+ } else {
+ stats.invalidMessage()
+ false
+ }
+ }
+
/**
* Double the I/O buffer capacity
*/
@@ -542,13 +588,14 @@ private[log] class Cleaner(val id: Int,
val startPosition = position
for (entry <- messages) {
val message = entry.message
- val size = MessageSet.entrySize(message)
- position += size
if (message.hasKey)
map.put(message.key, entry.offset)
offset = entry.offset
- stats.indexMessage(size)
+ stats.indexMessagesRead(1)
}
+ position += messages.validBytes
+ stats.indexBytesRead(messages.validBytes)
+
// if we didn't read even one complete message, our read buffer may be too small
if(position == startPosition)
growBuffers()
@@ -580,16 +627,19 @@ private case class CleanerStats(time: Time = SystemTime) {
messagesWritten += 1
bytesWritten += size
}
-
- def indexMessage(size: Int) {
- mapMessagesRead += 1
+
+ def indexMessagesRead(size: Int) {
+ mapMessagesRead += size
+ }
+
+ def indexBytesRead(size: Int) {
mapBytesRead += size
}
-
+
def indexDone() {
mapCompleteTime = time.milliseconds
}
-
+
def allDone() {
endTime = time.milliseconds
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 9dfe914..5a32de8 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -255,13 +255,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
buffer.reset()
this
} else {
- if (compactedTopic && targetCodec != NoCompressionCodec)
- throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " +
- "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.")
- // We need to crack open the message-set if any of these are true:
- // (i) messages are compressed,
- // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key)
- // If the broker is configured with a target compression codec then we need to recompress regardless of original codec
+ // We need to deep-iterate over the message-set if any of these are true:
+ // (i) messages are compressed
+ // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
if (compactedTopic && !messageAndOffset.message.hasKey)
throw new InvalidMessageException("Compacted topic cannot accept message without key.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 8445894..dcbfbe1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -52,6 +52,11 @@ object TestLogCleaning {
.describedAs("count")
.ofType(classOf[java.lang.Long])
.defaultsTo(Long.MaxValue)
+ val messageCompressionOpt = parser.accepts("compression-type", "message compression type")
+ .withOptionalArg()
+ .describedAs("compressionType")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo("none")
val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
.withRequiredArg
.describedAs("count")
@@ -84,38 +89,39 @@ object TestLogCleaning {
.withRequiredArg
.describedAs("directory")
.ofType(classOf[String])
-
+
val options = parser.parse(args:_*)
-
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.")
-
+
if(options.has(dumpOpt)) {
dumpLog(new File(options.valueOf(dumpOpt)))
System.exit(0)
}
-
+
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)
-
+
// parse options
val messages = options.valueOf(numMessagesOpt).longValue
+ val compressionType = options.valueOf(messageCompressionOpt)
val percentDeletes = options.valueOf(percentDeletesOpt).intValue
val dups = options.valueOf(numDupsOpt).intValue
val brokerUrl = options.valueOf(brokerOpt)
val topicCount = options.valueOf(topicsOpt).intValue
val zkUrl = options.valueOf(zkConnectOpt)
val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-
+
val testId = new Random().nextInt(Int.MaxValue)
val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
-
+
println("Producing %d messages...".format(messages))
- val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
+ val producedDataFile = produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes)
println("Sleeping for %d seconds...".format(sleepSecs))
Thread.sleep(sleepSecs * 1000)
println("Consuming messages...")
val consumedDataFile = consumeMessages(zkUrl, topics)
-
+
val producedLines = lineCount(producedDataFile)
val consumedLines = lineCount(consumedDataFile)
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
@@ -169,9 +175,9 @@ object TestLogCleaning {
}
producedDeduped.close()
consumedDeduped.close()
+ println("Validated " + total + " values, " + mismatched + " mismatches.")
require(!produced.hasNext, "Additional values produced not found in consumer log.")
require(!consumed.hasNext, "Additional values consumed not found in producer log.")
- println("Validated " + total + " values, " + mismatched + " mismatches.")
require(mismatched == 0, "Non-zero number of row mismatches.")
// if all the checks worked out we can delete the deduped files
producedDedupedFile.delete()
@@ -233,10 +239,11 @@ object TestLogCleaning {
}.start()
new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
}
-
- def produceMessages(brokerUrl: String,
- topics: Array[String],
- messages: Long,
+
+ def produceMessages(brokerUrl: String,
+ topics: Array[String],
+ messages: Long,
+ compressionType: String,
dups: Int,
percentDeletes: Int): File = {
val producerProps = new Properties
@@ -244,6 +251,7 @@ object TestLogCleaning {
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
val rand = new Random(1)
val keyCount = (messages / dups).toInt
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 3b5aa9d..471ddff 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,21 +18,28 @@
package kafka.log
import java.io.File
-import kafka.server.OffsetCheckpoint
-import scala.collection._
-import org.junit._
import kafka.common.TopicAndPartition
-import kafka.utils._
import kafka.message._
-import org.scalatest.junit.JUnitSuite
-import junit.framework.Assert._
+import kafka.server.OffsetCheckpoint
+import kafka.utils._
+import org.apache.kafka.common.record.CompressionType
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection._
+
/**
* This is an integration test that tests the fully integrated log cleaner
*/
-class LogCleanerIntegrationTest extends JUnitSuite {
-
+@RunWith(value = classOf[Parameterized])
+class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
+
val time = new MockTime()
val segmentSize = 100
val deleteDelay = 1000
@@ -40,16 +47,16 @@ class LogCleanerIntegrationTest extends JUnitSuite {
val logDir = TestUtils.tempDir()
var counter = 0
val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
-
+
@Test
def cleanerTest() {
val cleaner = makeCleaner(parts = 3)
val log = cleaner.logs.get(topics(0))
- val appends = writeDups(numKeys = 100, numDups = 3, log)
+ val appends = writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
val startSize = log.size
cleaner.startup()
-
+
val lastCleaned = log.activeSegment.baseOffset
// wait until we clean up to base_offset of active segment - minDirtyMessages
cleaner.awaitCleaned("log", 0, lastCleaned)
@@ -57,9 +64,9 @@ class LogCleanerIntegrationTest extends JUnitSuite {
val read = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
assertTrue(startSize > log.size)
-
+
// write some more stuff and validate again
- val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log)
+ val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
val lastCleaned2 = log.activeSegment.baseOffset
cleaner.awaitCleaned("log", 0, lastCleaned2)
val read2 = readFromLog(log)
@@ -79,19 +86,25 @@ class LogCleanerIntegrationTest extends JUnitSuite {
cleaner.shutdown()
}
-
+
def readFromLog(log: Log): Iterable[(Int, Int)] = {
- for(segment <- log.logSegments; message <- segment.log) yield {
- val key = TestUtils.readString(message.message.key).toInt
- val value = TestUtils.readString(message.message.payload).toInt
+ for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
+ // create single message iterator or deep iterator depending on compression codec
+ if (entry.message.compressionCodec == NoCompressionCodec)
+ Stream.cons(entry, Stream.empty).iterator
+ else
+ ByteBufferMessageSet.deepIterator(entry.message)
+ }) yield {
+ val key = TestUtils.readString(messageAndOffset.message.key).toInt
+ val value = TestUtils.readString(messageAndOffset.message.payload).toInt
key -> value
}
}
-
- def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+
+ def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec): Seq[(Int, Int)] = {
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
- log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+ log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes), assignOffsets = true)
counter += 1
(key, count)
}
@@ -128,4 +141,14 @@ class LogCleanerIntegrationTest extends JUnitSuite {
time = time)
}
+}
+
+object LogCleanerIntegrationTest {
+ @Parameters
+ def parameters: java.util.Collection[Array[String]] = {
+ val list = new java.util.ArrayList[Array[String]]()
+ for (codec <- CompressionType.values)
+ list.add(Array(codec.name))
+ list
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 76d3bfd..8e095d6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -337,6 +337,7 @@ class LogTest extends JUnitSuite {
val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
+ val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage)
val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
@@ -356,8 +357,8 @@ class LogTest extends JUnitSuite {
case e: InvalidMessageException => // this is good
}
try {
- log.append(messageSetWithCompressedKeyedMessage)
- fail("Compacted topics cannot accept compressed messages.")
+ log.append(messageSetWithCompressedUnkeyedMessage)
+ fail("Compacted topics cannot accept a message without a key.")
} catch {
case e: InvalidMessageException => // this is good
}
@@ -365,25 +366,7 @@ class LogTest extends JUnitSuite {
// the following should succeed without any InvalidMessageException
log.append(messageSetWithKeyedMessage)
log.append(messageSetWithKeyedMessages)
-
- // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages
- val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"),
- recoveryPoint = 0L, time.scheduler, time)
- uncompressedLog.append(messageSetWithCompressedKeyedMessage)
- uncompressedLog.append(messageSetWithKeyedMessage)
- uncompressedLog.append(messageSetWithKeyedMessages)
- try {
- uncompressedLog.append(messageSetWithUnkeyedMessage)
- fail("Compacted topics cannot accept a message without a key.")
- } catch {
- case e: InvalidMessageException => // this is good
- }
- try {
- uncompressedLog.append(messageSetWithOneUnkeyedMessage)
- fail("Compacted topics cannot accept a message without a key.")
- } catch {
- case e: InvalidMessageException => // this is good
- }
+ log.append(messageSetWithCompressedKeyedMessage)
}
/**