You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/05/20 19:28:07 UTC

kafka git commit: KAFKA-1374; Log cleaner should be able to handle compressed messages; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 29419581f -> bb133c63b


KAFKA-1374; Log cleaner should be able to handle compressed messages; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb133c63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb133c63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb133c63

Branch: refs/heads/trunk
Commit: bb133c63b0645c22f2d9b76393886fb506b14a93
Parents: 2941958
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed May 20 10:27:39 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 20 10:27:39 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 122 +++++++++++++------
 .../kafka/message/ByteBufferMessageSet.scala    |  10 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  36 +++---
 .../kafka/log/LogCleanerIntegrationTest.scala   |  63 +++++++---
 .../src/test/scala/unit/kafka/log/LogTest.scala |  25 +---
 5 files changed, 158 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index abea8b2..c9ade72 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,21 +17,18 @@
 
 package kafka.log
 
+import java.io.{DataOutputStream, File}
+import java.nio._
+import java.util.Date
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.message._
-import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
+import kafka.utils._
 
 import scala.collection._
-import scala.math
-import java.nio._
-import java.util.Date
-import java.io.File
-import java.lang.IllegalStateException
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import com.yammer.metrics.core.Gauge
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -389,7 +386,6 @@ private[log] class Cleaner(val id: Int,
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning this segment
    *
-   * TODO: Implement proper compression support
    */
   private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
                              dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
@@ -403,30 +399,30 @@ private[log] class Cleaner(val id: Int,
       throttler.maybeThrottle(messages.sizeInBytes)
       // check each message to see if it is to be retained
       var messagesRead = 0
-      for (entry <- messages) {
-        messagesRead += 1
+      for (entry <- messages.shallowIterator) {
         val size = MessageSet.entrySize(entry.message)
-        position += size
         stats.readMessage(size)
-        val key = entry.message.key
-        if (key != null) {
-          val foundOffset = map.get(key)
-          /* two cases in which we can get rid of a message:
-           *   1) if there exists a message with the same key but higher offset
-           *   2) if the message is a delete "tombstone" marker and enough time has passed
-           */
-          val redundant = foundOffset >= 0 && entry.offset < foundOffset
-          val obsoleteDelete = !retainDeletes && entry.message.isNull
-          if (!redundant && !obsoleteDelete) {
+        if (entry.message.compressionCodec == NoCompressionCodec) {
+          if (shouldRetainMessage(source, map, retainDeletes, entry)) {
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
             stats.recopyMessage(size)
           }
+          messagesRead += 1
         } else {
-          stats.invalidMessage()
+          val messages = ByteBufferMessageSet.deepIterator(entry.message)
+          val retainedMessages = messages.filter(messageAndOffset => {
+            messagesRead += 1
+            shouldRetainMessage(source, map, retainDeletes, messageAndOffset)
+          }).toSeq
+
+          if (retainedMessages.nonEmpty)
+            compressMessages(writeBuffer, entry.message.compressionCodec, retainedMessages)
         }
       }
+
+      position += messages.validBytes
       // if any messages are to be retained, write them out
-      if(writeBuffer.position > 0) {
+      if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = new ByteBufferMessageSet(writeBuffer)
         dest.append(retained.head.offset, retained)
@@ -434,12 +430,62 @@ private[log] class Cleaner(val id: Int,
       }
       
       // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
-      if(readBuffer.limit > 0 && messagesRead == 0)
+      if (readBuffer.limit > 0 && messagesRead == 0)
         growBuffers()
     }
     restoreBuffers()
   }
-  
+
+  private def compressMessages(buffer: ByteBuffer, compressionCodec: CompressionCodec, messages: Seq[MessageAndOffset]) {
+    val messagesIterable = messages.toIterable.map(_.message)
+    if (messages.isEmpty) {
+      MessageSet.Empty.sizeInBytes
+    } else if (compressionCodec == NoCompressionCodec) {
+      for(messageOffset <- messages)
+        ByteBufferMessageSet.writeMessage(buffer, messageOffset.message, messageOffset.offset)
+      MessageSet.messageSetSize(messagesIterable)
+    } else {
+      var offset = -1L
+      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messagesIterable) / 2, 1024), 1 << 16))
+      messageWriter.write(codec = compressionCodec) { outputStream =>
+        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
+        try {
+          for (messageOffset <- messages) {
+            val message = messageOffset.message
+            offset = messageOffset.offset
+            output.writeLong(offset)
+            output.writeInt(message.size)
+            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
+          }
+        } finally {
+          output.close()
+        }
+      }
+      ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset)
+      stats.recopyMessage(messageWriter.size + MessageSet.LogOverhead)
+    }
+  }
+
+  private def shouldRetainMessage(source: kafka.log.LogSegment,
+                                  map: kafka.log.OffsetMap,
+                                  retainDeletes: Boolean,
+                                  entry: kafka.message.MessageAndOffset): Boolean = {
+    val key = entry.message.key
+    if (key != null) {
+      val foundOffset = map.get(key)
+      /* two cases in which we can get rid of a message:
+       *   1) if there exists a message with the same key but higher offset
+       *   2) if the message is a delete "tombstone" marker and enough time has passed
+       */
+      val redundant = foundOffset >= 0 && entry.offset < foundOffset
+      val obsoleteDelete = !retainDeletes && entry.message.isNull
+      !redundant && !obsoleteDelete
+    } else {
+      stats.invalidMessage()
+      false
+    }
+  }
+
   /**
    * Double the I/O buffer capacity
    */
@@ -542,13 +588,14 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        val size = MessageSet.entrySize(message)
-        position += size
         if (message.hasKey)
           map.put(message.key, entry.offset)
         offset = entry.offset
-        stats.indexMessage(size)
+        stats.indexMessagesRead(1)
       }
+      position += messages.validBytes
+      stats.indexBytesRead(messages.validBytes)
+
       // if we didn't read even one complete message, our read buffer may be too small
       if(position == startPosition)
         growBuffers()
@@ -580,16 +627,19 @@ private case class CleanerStats(time: Time = SystemTime) {
     messagesWritten += 1
     bytesWritten += size
   }
-  
-  def indexMessage(size: Int) {
-    mapMessagesRead += 1
+
+  def indexMessagesRead(size: Int) {
+    mapMessagesRead += size
+  }
+
+  def indexBytesRead(size: Int) {
     mapBytesRead += size
   }
-  
+
   def indexDone() {
     mapCompleteTime = time.milliseconds
   }
-  
+
   def allDone() {
     endTime = time.milliseconds
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 9dfe914..5a32de8 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -255,13 +255,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       buffer.reset()
       this
     } else {
-      if (compactedTopic && targetCodec != NoCompressionCodec)
-        throw new InvalidMessageException("Compacted topic cannot accept compressed messages. " +
-          "Either the producer sent a compressed message or the topic has been configured with a broker-side compression codec.")
-      // We need to crack open the message-set if any of these are true:
-      // (i) messages are compressed,
-      // (ii) this message-set is sent to a compacted topic (and so we need to verify that each message has a key)
-      // If the broker is configured with a target compression codec then we need to recompress regardless of original codec
+      // We need to deep-iterate over the message-set if any of these are true:
+      // (i) messages are compressed
+      // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
       val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
         if (compactedTopic && !messageAndOffset.message.hasKey)
           throw new InvalidMessageException("Compacted topic cannot accept message without key.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 8445894..dcbfbe1 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -52,6 +52,11 @@ object TestLogCleaning {
                                .describedAs("count")
                                .ofType(classOf[java.lang.Long])
                                .defaultsTo(Long.MaxValue)
+    val messageCompressionOpt = parser.accepts("compression-type", "message compression type")
+                               .withOptionalArg()
+                               .describedAs("compressionType")
+                               .ofType(classOf[java.lang.String])
+                               .defaultsTo("none")
     val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
                            .withRequiredArg
                            .describedAs("count")
@@ -84,38 +89,39 @@ object TestLogCleaning {
                         .withRequiredArg
                         .describedAs("directory")
                         .ofType(classOf[String])
-    
+
     val options = parser.parse(args:_*)
-    
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.")
-    
+
     if(options.has(dumpOpt)) {
       dumpLog(new File(options.valueOf(dumpOpt)))
       System.exit(0)
     }
-    
+
     CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)
-    
+
     // parse options
     val messages = options.valueOf(numMessagesOpt).longValue
+    val compressionType = options.valueOf(messageCompressionOpt)
     val percentDeletes = options.valueOf(percentDeletesOpt).intValue
     val dups = options.valueOf(numDupsOpt).intValue
     val brokerUrl = options.valueOf(brokerOpt)
     val topicCount = options.valueOf(topicsOpt).intValue
     val zkUrl = options.valueOf(zkConnectOpt)
     val sleepSecs = options.valueOf(sleepSecsOpt).intValue
-    
+
     val testId = new Random().nextInt(Int.MaxValue)
     val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
-    
+
     println("Producing %d messages...".format(messages))
-    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
+    val producedDataFile = produceMessages(brokerUrl, topics, messages, compressionType, dups, percentDeletes)
     println("Sleeping for %d seconds...".format(sleepSecs))
     Thread.sleep(sleepSecs * 1000)
     println("Consuming messages...")
     val consumedDataFile = consumeMessages(zkUrl, topics)
-    
+
     val producedLines = lineCount(producedDataFile)
     val consumedLines = lineCount(consumedDataFile)
     val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
@@ -169,9 +175,9 @@ object TestLogCleaning {
     }
     producedDeduped.close()
     consumedDeduped.close()
+    println("Validated " + total + " values, " + mismatched + " mismatches.")
     require(!produced.hasNext, "Additional values produced not found in consumer log.")
     require(!consumed.hasNext, "Additional values consumed not found in producer log.")
-    println("Validated " + total + " values, " + mismatched + " mismatches.")
     require(mismatched == 0, "Non-zero number of row mismatches.")
     // if all the checks worked out we can delete the deduped files
     producedDedupedFile.delete()
@@ -233,10 +239,11 @@ object TestLogCleaning {
     }.start()
     new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
   }
-  
-  def produceMessages(brokerUrl: String, 
-                      topics: Array[String], 
-                      messages: Long, 
+
+  def produceMessages(brokerUrl: String,
+                      topics: Array[String],
+                      messages: Long,
+                      compressionType: String,
                       dups: Int,
                       percentDeletes: Int): File = {
     val producerProps = new Properties
@@ -244,6 +251,7 @@ object TestLogCleaning {
     producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
     val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 3b5aa9d..471ddff 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,21 +18,28 @@
 package kafka.log
 
 import java.io.File
-import kafka.server.OffsetCheckpoint
 
-import scala.collection._
-import org.junit._
 import kafka.common.TopicAndPartition
-import kafka.utils._
 import kafka.message._
-import org.scalatest.junit.JUnitSuite
-import junit.framework.Assert._
+import kafka.server.OffsetCheckpoint
+import kafka.utils._
+import org.apache.kafka.common.record.CompressionType
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection._
+
 
 /**
  * This is an integration test that tests the fully integrated log cleaner
  */
-class LogCleanerIntegrationTest extends JUnitSuite {
-  
+@RunWith(value = classOf[Parameterized])
+class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
+
   val time = new MockTime()
   val segmentSize = 100
   val deleteDelay = 1000
@@ -40,16 +47,16 @@ class LogCleanerIntegrationTest extends JUnitSuite {
   val logDir = TestUtils.tempDir()
   var counter = 0
   val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
-  
+
   @Test
   def cleanerTest() {
     val cleaner = makeCleaner(parts = 3)
     val log = cleaner.logs.get(topics(0))
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log)
+    val appends = writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
     val startSize = log.size
     cleaner.startup()
-    
+
     val lastCleaned = log.activeSegment.baseOffset
     // wait until we clean up to base_offset of active segment - minDirtyMessages
     cleaner.awaitCleaned("log", 0, lastCleaned)
@@ -57,9 +64,9 @@ class LogCleanerIntegrationTest extends JUnitSuite {
     val read = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
     assertTrue(startSize > log.size)
-    
+
     // write some more stuff and validate again
-    val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log)
+    val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
     val lastCleaned2 = log.activeSegment.baseOffset
     cleaner.awaitCleaned("log", 0, lastCleaned2)
     val read2 = readFromLog(log)
@@ -79,19 +86,25 @@ class LogCleanerIntegrationTest extends JUnitSuite {
     
     cleaner.shutdown()
   }
-  
+
   def readFromLog(log: Log): Iterable[(Int, Int)] = {
-    for(segment <- log.logSegments; message <- segment.log) yield {
-      val key = TestUtils.readString(message.message.key).toInt
-      val value = TestUtils.readString(message.message.payload).toInt
+    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
+      // create single message iterator or deep iterator depending on compression codec
+      if (entry.message.compressionCodec == NoCompressionCodec)
+        Stream.cons(entry, Stream.empty).iterator
+      else
+        ByteBufferMessageSet.deepIterator(entry.message)
+    }) yield {
+      val key = TestUtils.readString(messageAndOffset.message.key).toInt
+      val value = TestUtils.readString(messageAndOffset.message.payload).toInt
       key -> value
     }
   }
-  
-  def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+
+  def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec): Seq[(Int, Int)] = {
     for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
+      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes), assignOffsets = true)
       counter += 1
       (key, count)
     }
@@ -128,4 +141,14 @@ class LogCleanerIntegrationTest extends JUnitSuite {
                    time = time)
   }
 
+}
+
+object LogCleanerIntegrationTest {
+  @Parameters
+  def parameters: java.util.Collection[Array[String]] = {
+    val list = new java.util.ArrayList[Array[String]]()
+    for (codec <- CompressionType.values)
+      list.add(Array(codec.name))
+    list
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb133c63/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 76d3bfd..8e095d6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -337,6 +337,7 @@ class LogTest extends JUnitSuite {
     val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
     val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
     val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
+    val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage)
 
     val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
     val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
@@ -356,8 +357,8 @@ class LogTest extends JUnitSuite {
       case e: InvalidMessageException => // this is good
     }
     try {
-      log.append(messageSetWithCompressedKeyedMessage)
-      fail("Compacted topics cannot accept compressed messages.")
+      log.append(messageSetWithCompressedUnkeyedMessage)
+      fail("Compacted topics cannot accept a message without a key.")
     } catch {
       case e: InvalidMessageException => // this is good
     }
@@ -365,25 +366,7 @@ class LogTest extends JUnitSuite {
     // the following should succeed without any InvalidMessageException
     log.append(messageSetWithKeyedMessage)
     log.append(messageSetWithKeyedMessages)
-
-    // test that a compacted topic with broker-side compression type set to uncompressed can accept compressed messages
-    val uncompressedLog = new Log(logDir, logConfig.copy(compact = true, compressionType = "uncompressed"),
-                                  recoveryPoint = 0L, time.scheduler, time)
-    uncompressedLog.append(messageSetWithCompressedKeyedMessage)
-    uncompressedLog.append(messageSetWithKeyedMessage)
-    uncompressedLog.append(messageSetWithKeyedMessages)
-    try {
-      uncompressedLog.append(messageSetWithUnkeyedMessage)
-      fail("Compacted topics cannot accept a message without a key.")
-    } catch {
-      case e: InvalidMessageException => // this is good
-    }
-    try {
-      uncompressedLog.append(messageSetWithOneUnkeyedMessage)
-      fail("Compacted topics cannot accept a message without a key.")
-    } catch {
-      case e: InvalidMessageException => // this is good
-    }
+    log.append(messageSetWithCompressedKeyedMessage)
   }
 
   /**