You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/12/13 18:41:30 UTC

[3/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 250c8b8..65c2d05 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -22,10 +22,9 @@ import java.util.Properties
 
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
 import kafka.common.TopicAndPartition
-import kafka.message._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
-import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit._
@@ -43,7 +42,7 @@ import scala.util.Random
 @RunWith(value = classOf[Parameterized])
 class LogCleanerIntegrationTest(compressionCodec: String) {
 
-  val codec = CompressionCodec.getCompressionCodec(compressionCodec)
+  val codec = CompressionType.forName(compressionCodec)
   val time = new MockTime()
   val segmentSize = 256
   val deleteDelay = 1000
@@ -56,7 +55,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @Test
   def cleanerTest() {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V1)
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V1)
     val maxMessageSize = largeMessageSet.sizeInBytes
 
     cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
@@ -133,13 +132,13 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   }
 
   // returns (value, ByteBufferMessageSet)
-  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, ByteBufferMessageSet) = {
+  private def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte): (String, MemoryRecords) = {
     def messageValue(length: Int): String = {
       val random = new Random(0)
       new String(random.alphanumeric.take(length).toArray)
     }
     val value = messageValue(128)
-    val messageSet = TestUtils.singleMessageSet(payload = value.getBytes, codec = codec, key = key.toString.getBytes,
+    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
       magicValue = messageFormatVersion)
     (value, messageSet)
   }
@@ -147,9 +146,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @Test
   def testCleanerWithMessageFormatV0(): Unit = {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V0)
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V0)
     val maxMessageSize = codec match {
-      case NoCompressionCodec => largeMessageSet.sizeInBytes
+      case CompressionType.NONE => largeMessageSet.sizeInBytes
       case _ =>
         // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
         // increase because the broker offsets are larger than the ones assigned by the client
@@ -165,7 +164,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
     val startSize = log.size
     cleaner.startup()
 
@@ -177,14 +176,14 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appends2: Seq[(Int, String, Long)] = {
-      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
       val appendInfo = log.append(largeMessageSet, assignOffsets = true)
       val largeMessageOffset = appendInfo.firstOffset
 
       // also add some messages with version 1 to check that we handle mixed format versions correctly
       props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
       log.config = new LogConfig(props)
-      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
       appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
     }
     val firstDirty2 = log.activeSegment.baseOffset
@@ -205,15 +204,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     // with compression enabled, these messages will be written as a single message containing
     // all of the individual messages
-    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Message.MagicValue_V0)
-    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V0)
+    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
 
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
     log.config = new LogConfig(props)
 
-    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Message.MagicValue_V1)
+    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
 
     val appends = appendsV0 ++ appendsV1
 
@@ -250,32 +249,27 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   }
 
   private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
-
-    def messageIterator(entry: MessageAndOffset): Iterator[MessageAndOffset] =
-      // create single message iterator or deep iterator depending on compression codec
-      if (entry.message.compressionCodec == NoCompressionCodec) Iterator(entry)
-      else ByteBufferMessageSet.deepIterator(entry)
-
-    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- messageIterator(entry)) yield {
-      val key = TestUtils.readString(messageAndOffset.message.key).toInt
-      val value = TestUtils.readString(messageAndOffset.message.payload)
-      (key, value, messageAndOffset.offset)
+    import JavaConverters._
+    for (segment <- log.logSegments; deepLogEntry <- segment.log.deepIterator.asScala) yield {
+      val key = TestUtils.readString(deepLogEntry.record.key).toInt
+      val value = TestUtils.readString(deepLogEntry.record.value)
+      (key, value, deepLogEntry.offset)
     }
   }
 
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
-                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
+  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+                        startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
-      val payload = counter.toString
-      val appendInfo = log.append(TestUtils.singleMessageSet(payload = payload.toString.getBytes, codec = codec,
+      val value = counter.toString
+      val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
         key = key.toString.getBytes, magicValue = magicValue), assignOffsets = true)
       counter += 1
-      (key, payload, appendInfo.firstOffset)
+      (key, value, appendInfo.firstOffset)
     }
   }
 
-  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec,
-                                        startKey: Int = 0, magicValue: Byte = Message.CurrentMagicValue): Seq[(Int, String, Long)] = {
+  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+                                        startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
       counter += 1
@@ -283,11 +277,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     }
 
     val messages = kvs.map { case (key, payload) =>
-      new Message(payload.toString.getBytes, key.toString.getBytes, Message.NoTimestamp, magicValue)
+      Record.create(magicValue, key.toString.getBytes, payload.toString.getBytes)
     }
 
-    val messageSet = new ByteBufferMessageSet(compressionCodec = codec, messages: _*)
-    val appendInfo = log.append(messageSet, assignOffsets = true)
+    val records = MemoryRecords.withRecords(codec, messages: _*)
+    val appendInfo = log.append(records, assignOffsets = true)
     val offsets = appendInfo.firstOffset to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 5e029fc..abab3bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.util.Properties
 
 import kafka.common.TopicAndPartition
-import kafka.message._
 import kafka.utils._
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.utils.Utils
@@ -33,7 +32,6 @@ import org.junit.runners.Parameterized.Parameters
 
 import scala.collection._
 
-
 /**
   * This is an integration test that tests the fully integrated log cleaner
   */
@@ -52,7 +50,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   val logDir = TestUtils.tempDir()
   var counter = 0
   val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
-  val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName)
+  val compressionCodec = CompressionType.forName(compressionCodecName)
 
   @Test
   def cleanerTest(): Unit = {
@@ -96,7 +94,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
 
     val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
     debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
-    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(TopicAndPartition("log", 0))
     assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
     assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
       sizeUpToActiveSegmentAtT0 > compactedSize)
@@ -106,23 +104,19 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   }
 
   private def readFromLog(log: Log): Iterable[(Int, Int)] = {
-    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
-      // create single message iterator or deep iterator depending on compression codec
-      if (entry.message.compressionCodec == NoCompressionCodec)
-        Stream.cons(entry, Stream.empty).iterator
-      else
-        ByteBufferMessageSet.deepIterator(entry)
-    }) yield {
-      val key = TestUtils.readString(messageAndOffset.message.key).toInt
-      val value = TestUtils.readString(messageAndOffset.message.payload).toInt
+    import JavaConverters._
+
+    for (segment <- log.logSegments; logEntry <- segment.log.deepIterator.asScala) yield {
+      val key = TestUtils.readString(logEntry.record.key).toInt
+      val value = TestUtils.readString(logEntry.record.value).toInt
       key -> value
     }
   }
 
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
+  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = {
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+      log.append(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
       counter += 1
       (key, count)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0cd52d6..5dfa268 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -21,8 +21,8 @@ import java.io.File
 import java.util.Properties
 
 import kafka.common._
-import kafka.message._
 import kafka.utils._
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -54,8 +54,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     */
   @Test
   def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = {
-    val messageSet = TestUtils.singleMessageSet("test".getBytes)
-    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete)
+    val records = TestUtils.singletonRecords("test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -67,8 +67,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     */
   @Test
   def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): Unit = {
-    val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -81,8 +81,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     */
   @Test
   def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = {
-    val messageSet = TestUtils.singleMessageSet("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Compact)
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     while(log.numberOfSegments < 8)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
+      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
 
     val topicAndPartition = TopicAndPartition("log", 0)
     val lastClean = Map(topicAndPartition-> 0L)
@@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
 
     val activeSegAtT0 = log.activeSegment
 
@@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val t1 = time.milliseconds
 
     while (log.numberOfSegments < 8)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
+      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
 
     val topicAndPartition = TopicAndPartition("log", 0)
     val lastClean = Map(topicAndPartition-> 0L)
@@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while (log.numberOfSegments < 8)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
 
     time.sleep(compactionLag + 1)
 
@@ -192,10 +192,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
-  private def message(key: Int, value: Int, timestamp: Long) =
-    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-      bytes = value.toString.getBytes,
-      timestamp = timestamp,
-      magicValue = Message.MagicValue_V1))
+  private def logEntries(key: Int, value: Int, timestamp: Long) =
+    MemoryRecords.withRecords(Record.create(timestamp, key.toString.getBytes, value.toString.getBytes))
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index d80fba1..a99d4b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,21 +17,21 @@
 
 package kafka.log
 
-import java.io.{DataOutputStream, File}
+import java.io.File
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
 
 import kafka.common._
-import kafka.message._
 import kafka.utils._
-import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Test}
 import org.scalatest.junit.JUnitSuite
 
 import scala.collection._
+import JavaConverters._
 
 /**
  * Unit tests for the log cleaning logic
@@ -66,7 +66,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
     val keysFound = keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
@@ -100,7 +100,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     while(log.numberOfSegments < 2)
-      log.append(message(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
+      log.append(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)))
     val keysFound = keysInLog(log)
     assertEquals(0L until log.logEndOffset, keysFound)
 
@@ -123,23 +123,23 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-    
+
     // append messages with the keys 0 through N
     while(log.numberOfSegments < 2)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
     // delete all even keys between 0 and N
     val leo = log.logEndOffset
     for(key <- 0 until leo.toInt by 2)
-      log.append(deleteMessage(key))
-      
+      log.append(tombstoneRecord(key))
+
     // append some new unique keys to pad out to a new active segment
     while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
     val keys = keysInLog(log).toSet
-    assertTrue("None of the keys we deleted should still exist.", 
+    assertTrue("None of the keys we deleted should still exist.",
                (0 until leo.toInt by 2).forall(!keys.contains(_)))
   }
 
@@ -151,11 +151,11 @@ class LogCleanerTest extends JUnitSuite {
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    log.append(message(0,0)) // offset 0
-    log.append(message(1,1)) // offset 1
-    log.append(message(0,0)) // offset 2
-    log.append(message(1,1)) // offset 3
-    log.append(message(0,0)) // offset 4
+    log.append(record(0,0)) // offset 0
+    log.append(record(1,1)) // offset 1
+    log.append(record(0,0)) // offset 2
+    log.append(record(1,1)) // offset 3
+    log.append(record(0,0)) // offset 4
     // roll the segment, so we can clean the messages already appended
     log.roll()
 
@@ -180,11 +180,11 @@ class LogCleanerTest extends JUnitSuite {
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
-    log.append(message(0,0)) // offset 0
-    log.append(message(1,1)) // offset 1
-    log.append(message(0,0)) // offset 2
-    log.append(message(1,1)) // offset 3
-    log.append(message(0,0)) // offset 4
+    log.append(record(0,0)) // offset 0
+    log.append(record(1,1)) // offset 1
+    log.append(record(0,0)) // offset 2
+    log.append(record(1,1)) // offset 3
+    log.append(record(0,0)) // offset 4
     // roll the segment, so we can clean the messages already appended
     log.roll()
 
@@ -218,18 +218,18 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages with the keys 0 through N-1, values equal offset
     while(log.numberOfSegments <= numCleanableSegments)
-      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
 
     // at this point one message past the cleanable segments has been added
     // the entire segment containing the first uncleanable offset should not be cleaned.
     val firstUncleanableOffset = log.logEndOffset + 1  // +1  so it is past the baseOffset
 
     while(log.numberOfSegments < numTotalSegments - 1)
-      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
 
     // the last (active) segment has just one message
 
-    def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq
+    def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowIterator.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
 
     val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -253,7 +253,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
@@ -271,7 +271,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
@@ -305,14 +305,14 @@ class LogCleanerTest extends JUnitSuite {
 
     // append unkeyed messages
     while(log.numberOfSegments < 2)
-      log.append(unkeyedMessage(log.logEndOffset.toInt))
+      log.append(unkeyedRecord(log.logEndOffset.toInt))
     val numInvalidMessages = unkeyedMessageCountInLog(log)
 
     val sizeWithUnkeyedMessages = log.size
 
     // append keyed messages
     while(log.numberOfSegments < 3)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
     val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
@@ -321,17 +321,17 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
     assertEquals("Cleaner should have seen %d invalid messages.", numInvalidMessages, stats.invalidMessagesRead)
   }
-  
+
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] =
-    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt))
+    log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
 
   /* extract all the offsets from a log */
   def offsetsInLog(log: Log): Iterable[Long] =
-    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset))
+    log.logSegments.flatMap(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
 
   def unkeyedMessageCountInLog(log: Log) =
-    log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
+    log.logSegments.map(s => s.log.shallowIterator.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
 
   def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
     throw new LogCleaningAbortedException()
@@ -350,7 +350,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // append messages to the log until we have four segments
     while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
     val keys = keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
@@ -371,20 +371,20 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-    
+
     // append some messages to the log
     var i = 0
     while(log.numberOfSegments < 10) {
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
       i += 1
     }
-    
+
     // grouping by very large values should result in a single group with all the segments in it
     var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(1, groups.size)
     assertEquals(log.numberOfSegments, groups.head.size)
     checkSegmentOrder(groups)
-    
+
     // grouping by very small values should result in all groups having one entry
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue)
     assertEquals(log.numberOfSegments, groups.size)
@@ -396,20 +396,20 @@ class LogCleanerTest extends JUnitSuite {
     checkSegmentOrder(groups)
 
     val groupSize = 3
-    
+
     // check grouping by log size
     val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
-    
+
     // check grouping by index size
     val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
   }
-  
+
   /**
    * Validate the logic for grouping log segments together for cleaning when only a small number of
    * messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
@@ -425,47 +425,45 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
-       
+
     // fill up first segment
     while (log.numberOfSegments == 1)
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
-    
+      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+
     // forward offset and append message to next segment at offset Int.MaxValue
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new LongRef(Int.MaxValue - 1),
-      new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, Message.MagicValue_V1))
-    log.append(messageSet, assignOffsets = false)
-    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+    val records = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue - 1, Record.create("hello".getBytes, "hello".getBytes)))
+    log.append(records, assignOffsets = false)
+    log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
     assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
-    
+
     // grouping should result in a single group with maximum relative offset of Int.MaxValue
     var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(1, groups.size)
-    
+
     // append another message, making last offset of second segment > Int.MaxValue
-    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
-    
+    log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
+
     // grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(2, groups.size)
     checkSegmentOrder(groups)
-    
+
     // append more messages, creating new segments, further grouping should still occur
     while (log.numberOfSegments < 4)
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
+      log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
 
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
     assertEquals(log.numberOfSegments - 1, groups.size)
     for (group <- groups)
       assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
     checkSegmentOrder(groups)
-    
   }
-  
+
   private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
     val offsets = groups.flatMap(_.map(_.baseOffset))
     assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
   }
-  
+
   /**
    * Test building an offset map off the log
    */
@@ -496,8 +494,7 @@ class LogCleanerTest extends JUnitSuite {
     checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
     checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
   }
-  
-  
+
   /**
    * Tests recovery if broker crashes at the following stages during the cleaning sequence
    * <ol>
@@ -516,8 +513,8 @@ class LogCleanerTest extends JUnitSuite {
     logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
 
     val config = LogConfig.fromProps(logConfig.originals, logProps)
-      
-    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {   
+
+    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log = {
       // Recover log file and check that after recovery, keys are as expected
       // and all temporary files have been deleted
       val recoveredLog = makeLog(config = config)
@@ -530,25 +527,25 @@ class LogCleanerTest extends JUnitSuite {
       assertEquals(expectedKeys, keysInLog(recoveredLog))
       recoveredLog
     }
-    
+
     // create a log and append some messages
     var log = makeLog(config = config)
     var messageCount = 0
     while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
       messageCount += 1
     }
     val allKeys = keysInLog(log)
-    
+
     // pretend we have odd-numbered keys
     val offsetMap = new FakeOffsetMap(Int.MaxValue)
     for (k <- 1 until messageCount by 2)
       offsetMap.put(key(k), Long.MaxValue)
-     
+
     // clean the log
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
     var cleanedKeys = keysInLog(log)
-    
+
     // 1) Simulate recovery just after .cleaned file is created, before rename to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
     log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
@@ -556,44 +553,44 @@ class LogCleanerTest extends JUnitSuite {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
     }
     log = recoverAndCheck(config, allKeys)
-    
+
     // clean again
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
     cleanedKeys = keysInLog(log)
-    
+
     // 2) Simulate recovery just after swap file is created, before old segment files are
-    //    renamed to .deleted. Clean operation is resumed during recovery. 
+    //    renamed to .deleted. Clean operation is resumed during recovery.
     log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
-    }   
+    }
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
-      offsetMap.put(key(k), Long.MaxValue)    
+      offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
     cleanedKeys = keysInLog(log)
-    
+
     // 3) Simulate recovery after swap file is created and old segments files are renamed
     //    to .deleted. Clean operation is resumed during recovery.
     log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
     log = recoverAndCheck(config, cleanedKeys)
-    
+
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      log.append(record(log.logEndOffset.toInt, log.logEndOffset.toInt))
       messageCount += 1
     }
     for (k <- 1 until messageCount by 2)
-      offsetMap.put(key(k), Long.MaxValue)    
+      offsetMap.put(key(k), Long.MaxValue)
     cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats())
     cleanedKeys = keysInLog(log)
-    
+
     // 4) Simulate recovery after swap is complete, but async deletion
     //    is not yet complete. Clean operation is resumed during recovery.
     recoverAndCheck(config, cleanedKeys)
@@ -631,11 +628,11 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog()
     val cleaner = makeCleaner(2)
 
-    log.append(message(0,0))
-    log.append(message(1,1))
-    log.append(message(2,2))
-    log.append(message(3,3))
-    log.append(message(4,4))
+    log.append(record(0,0))
+    log.append(record(1,1))
+    log.append(record(2,2))
+    log.append(record(3,3))
+    log.append(record(4,4))
     log.roll()
 
     val stats = new CleanerStats()
@@ -653,7 +650,7 @@ class LogCleanerTest extends JUnitSuite {
    */
   @Test
   def testCleanCorruptMessageSet() {
-    val codec = SnappyCompressionCodec
+    val codec = CompressionType.GZIP
 
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, codec.name)
@@ -682,10 +679,10 @@ class LogCleanerTest extends JUnitSuite {
 
     cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
 
-    for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) {
-      assertEquals(shallowMessage.message.magic, deepMessage.message.magic)
-      val value = TestUtils.readString(deepMessage.message.payload).toLong
-      assertEquals(deepMessage.offset, value)
+    for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowIterator.asScala; deepLogEntry <- shallowLogEntry.asScala) {
+      assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
+      val value = TestUtils.readString(deepLogEntry.record.value).toLong
+      assertEquals(deepLogEntry.offset, value)
     }
   }
 
@@ -704,7 +701,7 @@ class LogCleanerTest extends JUnitSuite {
     val corruptedMessage = invalidCleanedMessage(offset, set)
     val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
 
-    for (logEntry <- records.iterator.asScala) {
+    for (logEntry <- records.deepIterator.asScala) {
       val offset = logEntry.offset
       val value = TestUtils.readString(logEntry.record.value).toLong
       assertEquals(offset, value)
@@ -718,94 +715,64 @@ class LogCleanerTest extends JUnitSuite {
 
   private def invalidCleanedMessage(initialOffset: Long,
                                     keysAndValues: Iterable[(Int, Int)],
-                                    codec: CompressionCodec = SnappyCompressionCodec): ByteBufferMessageSet = {
+                                    codec: CompressionType = CompressionType.GZIP): MemoryRecords = {
     // this function replicates the old versions of the cleaner which under some circumstances
     // would write invalid compressed message sets with the outer magic set to 1 and the inner
     // magic set to 0
-
-    val messages = keysAndValues.map(kv =>
-      new Message(key = kv._1.toString.getBytes,
-        bytes = kv._2.toString.getBytes,
-        timestamp = Message.NoTimestamp,
-        magicValue = Message.MagicValue_V0))
-
-    val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-    var lastOffset = initialOffset
-
-    messageWriter.write(
-      codec = codec,
-      timestamp = Message.NoTimestamp,
-      timestampType = TimestampType.CREATE_TIME,
-      magicValue = Message.MagicValue_V1) { outputStream =>
-
-      val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream))
-      try {
-        for (message <- messages) {
-          val innerOffset = lastOffset - initialOffset
-          output.writeLong(innerOffset)
-          output.writeInt(message.size)
-          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
-          lastOffset += 1
-        }
-      } finally {
-        output.close()
-      }
+    val records = keysAndValues.map(kv =>
+      Record.create(Record.MAGIC_VALUE_V0,
+        Record.NO_TIMESTAMP,
+        kv._1.toString.getBytes,
+        kv._2.toString.getBytes))
+
+    val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
+    val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME)
+
+    var offset = initialOffset
+    records.foreach { record =>
+      builder.appendUnchecked(offset, record)
+      offset += 1
     }
-    val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
-    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
-    buffer.rewind()
 
-    new ByteBufferMessageSet(buffer)
+    builder.build()
   }
 
   private def messageWithOffset(key: Int, value: Int, offset: Long) =
-    new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
-                             new Message(key = key.toString.getBytes,
-                                         bytes = value.toString.getBytes,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-  
-  
+    MemoryRecords.withLogEntries(LogEntry.create(offset, Record.create(key.toString.getBytes, value.toString.getBytes)))
+
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
   def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
 
   def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) =
-    new Cleaner(id = 0, 
-                offsetMap = new FakeOffsetMap(capacity), 
+    new Cleaner(id = 0,
+                offsetMap = new FakeOffsetMap(capacity),
                 ioBufferSize = maxMessageSize,
                 maxIoBufferSize = maxMessageSize,
                 dupBufferLoadFactor = 0.75,
-                throttler = throttler, 
+                throttler = throttler,
                 time = time,
                 checkDone = checkDone )
-  
+
   def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for((key, value) <- seq)
-      yield log.append(message(key, value)).firstOffset
+      yield log.append(record(key, value)).firstOffset
   }
-  
+
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
-  
-  def message(key: Int, value: Int): ByteBufferMessageSet =
-    message(key, value.toString.getBytes)
-
-  def message(key: Int, value: Array[Byte]) =
-    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-                                         bytes = value,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-
-  def unkeyedMessage(value: Int) =
-    new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes))
-
-  def deleteMessage(key: Int) =
-    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-                                         bytes = null,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-  
+
+  def record(key: Int, value: Int): MemoryRecords =
+    record(key, value.toString.getBytes)
+
+  def record(key: Int, value: Array[Byte]) =
+    MemoryRecords.withRecords(Record.create(key.toString.getBytes, value))
+
+  def unkeyedRecord(value: Int) =
+    MemoryRecords.withRecords(Record.create(value.toString.getBytes))
+
+  def tombstoneRecord(key: Int) = record(key, null)
+
 }
 
 class FakeOffsetMap(val slots: Int) extends OffsetMap {

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 5421da9..40e6228 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -67,7 +67,7 @@ class LogManagerTest {
     val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singletonRecords("test".getBytes()))
   }
 
   /**
@@ -89,7 +89,7 @@ class LogManagerTest {
     val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     var offset = 0L
     for(_ <- 0 until 200) {
-      val set = TestUtils.singleMessageSet("test".getBytes())
+      val set = TestUtils.singletonRecords("test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -101,7 +101,7 @@ class LogManagerTest {
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
 
     try {
       log.read(0, 1024)
@@ -110,7 +110,7 @@ class LogManagerTest {
       case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singletonRecords("test".getBytes()))
   }
 
   /**
@@ -118,7 +118,7 @@ class LogManagerTest {
    */
   @Test
   def testCleanupSegmentsToMaintainSize() {
-    val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
+    val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes
     logManager.shutdown()
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer)
@@ -135,7 +135,7 @@ class LogManagerTest {
     // add a bunch of messages that should be larger than the retentionSize
     val numMessages = 200
     for (_ <- 0 until numMessages) {
-      val set = TestUtils.singleMessageSet("test".getBytes())
+      val set = TestUtils.singletonRecords("test".getBytes())
       val info = log.append(set)
       offset = info.firstOffset
     }
@@ -147,7 +147,7 @@ class LogManagerTest {
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
@@ -155,7 +155,7 @@ class LogManagerTest {
       case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singletonRecords("test".getBytes()))
   }
 
   /**
@@ -169,7 +169,7 @@ class LogManagerTest {
     val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
     for (_ <- 0 until 200) {
-      val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+      val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -198,7 +198,7 @@ class LogManagerTest {
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
     for (_ <- 0 until 200) {
-      val set = TestUtils.singleMessageSet("test".getBytes())
+      val set = TestUtils.singletonRecords("test".getBytes())
       log.append(set)
     }
     time.sleep(logManager.InitialTaskDelayMs)
@@ -280,7 +280,7 @@ class LogManagerTest {
     val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
     logs.foreach(log => {
       for (_ <- 0 until 50)
-        log.append(TestUtils.singleMessageSet("test".getBytes()))
+        log.append(TestUtils.singletonRecords("test".getBytes()))
 
       log.flush()
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index f02c5cb..d99981a 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -16,15 +16,13 @@
  */
  package kafka.log
 
-import org.junit.Assert._
-import java.util.concurrent.atomic._
-
-import kafka.common.LongRef
-import org.junit.{After, Test}
 import kafka.utils.TestUtils
-import kafka.message._
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
 import org.apache.kafka.common.utils.Time
+import org.junit.Assert._
+import org.junit.{After, Test}
 
+import scala.collection.JavaConverters._
 import scala.collection._
 
 class LogSegmentTest {
@@ -34,7 +32,7 @@ class LogSegmentTest {
   /* create a segment with the given base offset */
   def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
     val msFile = TestUtils.tempFile()
-    val ms = new FileMessageSet(msFile)
+    val ms = FileRecords.open(msFile)
     val idxFile = TestUtils.tempFile()
     val timeIdxFile = TestUtils.tempFile()
     idxFile.delete()
@@ -47,12 +45,10 @@ class LogSegmentTest {
   }
   
   /* create a ByteBufferMessageSet for the given messages starting from the given offset */
-  def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
-    new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
-                             offsetCounter = new LongRef(offset),
-                             messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*)
+  def records(offset: Long, records: String*): MemoryRecords = {
+    MemoryRecords.withRecords(offset, records.map(s => Record.create(Record.MAGIC_VALUE_V1, offset * 10, s.getBytes)):_*)
   }
-  
+
   @After
   def teardown() {
     for(seg <- segments) {
@@ -60,7 +56,7 @@ class LogSegmentTest {
       seg.log.delete()
     }
   }
-  
+
   /**
    * A read on an empty log segment should return null
    */
@@ -70,7 +66,7 @@ class LogSegmentTest {
     val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should be null", read)
   }
-  
+
   /**
    * Reading from before the first offset in the segment should return messages
    * beginning with the first message in the segment
@@ -78,12 +74,12 @@ class LogSegmentTest {
   @Test
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
-    val ms = messages(50, "hello", "there", "little", "bee")
-    seg.append(50, Message.NoTimestamp, -1L, ms)
-    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
-    assertEquals(ms.toList, read.toList)
+    val ms = records(50, "hello", "there", "little", "bee")
+    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
+    assertEquals(ms.deepIterator.asScala.toList, read.deepIterator.asScala.toList)
   }
-  
+
   /**
    * If we set the startOffset and maxOffset for the read to be the same value
    * we should get only the first message in the log
@@ -92,28 +88,28 @@ class LogSegmentTest {
   def testMaxOffset() {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
-    val ms = messages(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, Message.NoTimestamp, -1L, ms)
-    def validate(offset: Long) = 
-      assertEquals(ms.filter(_.offset == offset).toList, 
-                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
+    val ms = records(baseOffset, "hello", "there", "beautiful")
+    seg.append(baseOffset, Record.NO_TIMESTAMP, -1L, ms)
+    def validate(offset: Long) =
+      assertEquals(ms.deepIterator.asScala.filter(_.offset == offset).toList,
+                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepIterator.asScala.toList)
     validate(50)
     validate(51)
     validate(52)
   }
-  
+
   /**
    * If we read from an offset beyond the last offset in the segment we should get null
    */
   @Test
   def testReadAfterLast() {
     val seg = createSegment(40)
-    val ms = messages(50, "hello", "there")
-    seg.append(50, Message.NoTimestamp, -1L, ms)
+    val ms = records(50, "hello", "there")
+    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
-  
+
   /**
    * If we read from an offset which doesn't exist we should get a message set beginning
    * with the least offset greater than the given startOffset.
@@ -121,14 +117,14 @@ class LogSegmentTest {
   @Test
   def testReadFromGap() {
     val seg = createSegment(40)
-    val ms = messages(50, "hello", "there")
-    seg.append(50, Message.NoTimestamp, -1L, ms)
-    val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, Message.NoTimestamp, -1L, ms2)
+    val ms = records(50, "hello", "there")
+    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    val ms2 = records(60, "alpha", "beta")
+    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.toList, read.messageSet.toList)
+    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
   }
-  
+
   /**
    * In a loop append two messages then truncate off the second of those messages and check that we can read
    * the first but not the second message.
@@ -138,18 +134,18 @@ class LogSegmentTest {
     val seg = createSegment(40)
     var offset = 40
     for (_ <- 0 until 30) {
-      val ms1 = messages(offset, "hello")
-      seg.append(offset, Message.NoTimestamp, -1L, ms1)
-      val ms2 = messages(offset + 1, "hello")
-      seg.append(offset + 1, Message.NoTimestamp, -1L, ms2)
+      val ms1 = records(offset, "hello")
+      seg.append(offset, Record.NO_TIMESTAMP, -1L, ms1)
+      val ms2 = records(offset + 1, "hello")
+      seg.append(offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
-      assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
+      assertEquals(List(ms1.deepIterator.next(), ms2.deepIterator.next()), read.records.deepIterator.asScala.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
       val read2 = seg.read(offset, None, 10000)
-      assertEquals(1, read2.messageSet.size)
-      assertEquals(ms1.head, read2.messageSet.head)
+      assertEquals(1, read2.records.deepIterator.asScala.size)
+      assertEquals(ms1.deepIterator.next(), read2.records.deepIterator.next())
       offset += 1
     }
   }
@@ -157,10 +153,10 @@ class LogSegmentTest {
   @Test
   def testReloadLargestTimestampAfterTruncation() {
     val numMessages = 30
-    val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
+    val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
     var offset = 40
     for (_ <- 0 until numMessages) {
-      seg.append(offset, offset, offset, messages(offset, "hello"))
+      seg.append(offset, offset, offset, records(offset, "hello"))
       offset += 1
     }
     val expectedNumEntries = numMessages / 2 - 1
@@ -179,10 +175,10 @@ class LogSegmentTest {
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
-    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
+    seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
     seg.truncateTo(0)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
-    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
+    seg.append(40, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
 
   /**
@@ -190,11 +186,11 @@ class LogSegmentTest {
    */
   @Test
   def testFindOffsetByTimestamp() {
-    val messageSize = messages(0, s"msg00").sizeInBytes
+    val messageSize = records(0, s"msg00").sizeInBytes
     val seg = createSegment(40, messageSize * 2 - 1)
     // Produce some messages
     for (i <- 40 until 50)
-      seg.append(i, i * 10, i, messages(i, s"msg$i"))
+      seg.append(i, i * 10, i, records(i, s"msg$i"))
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
@@ -218,10 +214,10 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.nextOffset)
-    seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you"))
+    seg.append(50, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
     assertEquals(53, seg.nextOffset())
   }
-  
+
   /**
    * Test that we can change the file suffixes for the log and index files
    */
@@ -236,7 +232,7 @@ class LogSegmentTest {
     assertTrue(seg.log.file.exists)
     assertTrue(seg.index.file.exists)
   }
-  
+
   /**
    * Create a segment with some data and an index. Then corrupt the index,
    * and recover the segment, the entries should all be readable.
@@ -245,12 +241,12 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
+      seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset)
+      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepIterator.next().offset)
   }
 
   /**
@@ -261,7 +257,7 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptTimeIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i * 10, i, messages(i, i.toString))
+      seg.append(i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.timeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(64*1024)
@@ -271,7 +267,7 @@ class LogSegmentTest {
         assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset)
     }
   }
-  
+
   /**
    * Randomly corrupt a log a number of times and attempt recovery.
    */
@@ -281,13 +277,15 @@ class LogSegmentTest {
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
-        seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
+        seg.append(i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
-      val position = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)._1.position + TestUtils.random.nextInt(15)
-      TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
+
+      val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
+      val position = recordPosition.position + TestUtils.random.nextInt(15)
+      TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
       seg.recover(64*1024)
-      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
+      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowIterator.asScala.map(_.offset).toList)
       seg.delete()
     }
   }
@@ -304,12 +302,12 @@ class LogSegmentTest {
   @Test
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
-    val ms = messages(50, "hello", "there")
-    seg.append(50, Message.NoTimestamp, -1L, ms)
-    val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, Message.NoTimestamp, -1L, ms2)
+    val ms = records(50, "hello", "there")
+    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    val ms2 = records(60, "alpha", "beta")
+    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.toList, read.messageSet.toList)
+    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
   }
 
   /* create a segment with   pre allocate and clearly shut down*/
@@ -318,12 +316,12 @@ class LogSegmentTest {
     val tempDir = TestUtils.tempDir()
     val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
 
-    val ms = messages(50, "hello", "there")
-    seg.append(50, Message.NoTimestamp, -1L, ms)
-    val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, Message.NoTimestamp, -1L, ms2)
+    val ms = records(50, "hello", "there")
+    seg.append(50, Record.NO_TIMESTAMP, -1L, ms)
+    val ms2 = records(60, "alpha", "beta")
+    seg.append(60, Record.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.toList, read.messageSet.toList)
+    assertEquals(ms2.deepIterator.asScala.toList, read.records.deepIterator.asScala.toList)
     val oldSize = seg.log.sizeInBytes()
     val oldPosition = seg.log.channel.position
     val oldFileSize = seg.log.file.length
@@ -336,7 +334,7 @@ class LogSegmentTest {
     segments += segReopen
 
     val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.toList, readAgain.messageSet.toList)
+    assertEquals(ms2.deepIterator.asScala.toList, readAgain.records.deepIterator.asScala.toList)
     val size = segReopen.log.sizeInBytes()
     val position = segReopen.log.channel.position
     val fileSize = segReopen.log.file.length