You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:55 UTC

[02/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 5825ab7..c87e927 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -18,16 +18,18 @@
 package kafka.log
 
 import java.io._
+import java.nio.ByteBuffer
 import java.util.Properties
 
-import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
+import org.apache.kafka.common.errors._
 import kafka.api.ApiVersion
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP
+import org.apache.kafka.common.record.{RecordBatch, _}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -113,7 +115,7 @@ class LogTest extends JUnitSuite {
 
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)
-    log.append(MemoryRecords.withLogEntries())
+    log.append(MemoryRecords.withRecords(CompressionType.NONE))
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
@@ -127,7 +129,7 @@ class LogTest extends JUnitSuite {
     val maxJitter = 20 * 60L
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
     logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
     // create a log
     val log = new Log(logDir,
@@ -193,17 +195,19 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    val records = (0 until 100 by 2).map(id => Record.create(time.milliseconds, null, id.toString.getBytes)).toArray
+    val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
-    for(i <- records.indices)
-      log.append(MemoryRecords.withRecords(records(i)))
+    for(value <- values)
+      log.append(TestUtils.singletonRecords(value = value))
 
-    for(i <- records.indices) {
-      val read = log.read(i, 100, Some(i+1)).records.shallowEntries.iterator.next()
-      assertEquals("Offset read should match order appended.", i, read.offset)
-      assertEquals("Message should match appended.", records(i), read.record)
+    for(i <- values.indices) {
+      val read = log.read(i, 100, Some(i+1)).records.batches.iterator.next()
+      assertEquals("Offset read should match order appended.", i, read.lastOffset)
+      val actual = read.iterator.next()
+      assertNull("Key should be null", actual.key)
+      assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value)
     }
-    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowEntries.asScala.size)
+    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(values.length, 100, None).records.batches.asScala.size)
   }
 
   /**
@@ -213,19 +217,19 @@ class LogTest extends JUnitSuite {
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for(i <- records.indices)
-      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
-      val read = log.read(i, 100, None).records.shallowEntries.iterator.next()
+      val read = log.read(i, 100, None).records.records.iterator.next()
       assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
-      assertEquals("Message should match appended.", records(idx), read.record)
+      assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
     }
   }
 
@@ -243,26 +247,26 @@ class LogTest extends JUnitSuite {
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
-      log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
+      log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
 
     assertEquals("A read should now return the last message in the log", log.logEndOffset - 1,
-      log.read(1, 200, None).records.shallowEntries.iterator.next().offset)
+      log.read(1, 200, None).records.batches.iterator.next().lastOffset)
   }
 
   @Test
   def testReadWithMinMessage() {
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
-      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
 
     for (i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
@@ -270,28 +274,27 @@ class LogTest extends JUnitSuite {
         log.read(i, 1, minOneMessage = true),
         log.read(i, 100, minOneMessage = true),
         log.read(i, 100, Some(10000), minOneMessage = true)
-      ).map(_.records.shallowEntries.iterator.next())
+      ).map(_.records.records.iterator.next())
       reads.foreach { read =>
         assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
-        assertEquals("Message should match appended.", records(idx), read.record)
+        assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
       }
 
-      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowEntries.asScala.toIndexedSeq)
+      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.batches.asScala.toIndexedSeq)
     }
-
   }
 
   @Test
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
+    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
-      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
+      log.append(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)), assignOffsets = false)
 
     for (i <- 50 until messageIds.max) {
       assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
@@ -322,7 +325,7 @@ class LogTest extends JUnitSuite {
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
+    log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
 
@@ -362,15 +365,19 @@ class LogTest extends JUnitSuite {
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
     for(i <- 0 until numMessages) {
-      val messages = log.read(offset, 1024*1024).records.shallowEntries
+      val messages = log.read(offset, 1024*1024).records.batches
       val head = messages.iterator.next()
-      assertEquals("Offsets not equal", offset, head.offset)
-      assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record,
-        head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, TimestampType.NO_TIMESTAMP_TYPE))
-      offset = head.offset + 1
+      assertEquals("Offsets not equal", offset, head.lastOffset)
+
+      val expected = messageSets(i).records.iterator.next()
+      val actual = head.iterator.next()
+      assertEquals(s"Keys not equal at offset $offset", expected.key, actual.key)
+      assertEquals(s"Values not equal at offset $offset", expected.value, actual.value)
+      assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp)
+      offset = head.lastOffset + 1
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
-    assertEquals("Should be no more messages", 0, lastRead.shallowEntries.asScala.size)
+    assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
 
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
     TestUtils.retry(1000L){
@@ -385,24 +392,20 @@ class LogTest extends JUnitSuite {
   def testCompressedMessages() {
     /* this log should roll after every messageset */
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP,
-                                         Record.create(time.milliseconds, null, "hello".getBytes),
-                                         Record.create(time.milliseconds, null, "there".getBytes)))
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP,
-                                         Record.create(time.milliseconds, null, "alpha".getBytes),
-                                         Record.create(time.milliseconds, null, "beta".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)))
 
-    def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
+    def read(offset: Int) = log.read(offset, 4096).records.records
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
-    assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
-    assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset)
-    assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset)
-    assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset)
+    assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset)
+    assertEquals("Read at offset 1 should produce 0", 0, read(1).iterator.next().offset)
+    assertEquals("Read at offset 2 should produce 2", 2, read(2).iterator.next().offset)
+    assertEquals("Read at offset 3 should produce 2", 2, read(3).iterator.next().offset)
   }
 
   /**
@@ -445,8 +448,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSetSizeCheck() {
-    val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),
-                                               Record.create(time.milliseconds, null, "bethe".getBytes))
+    val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
     val logProps = new Properties()
@@ -465,9 +467,9 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testCompactedTopicConstraints() {
-    val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds, "and here it is".getBytes, "this message has a key".getBytes)
-    val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds, "another key".getBytes, "this message also has a key".getBytes)
-    val unkeyedMessage = Record.create("this message does not have a key".getBytes)
+    val keyedMessage = new SimpleRecord("and here it is".getBytes, "this message has a key".getBytes)
+    val anotherKeyedMessage = new SimpleRecord("another key".getBytes, "this message also has a key".getBytes)
+    val unkeyedMessage = new SimpleRecord("this message does not have a key".getBytes)
 
     val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage, keyedMessage)
     val messageSetWithOneUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage)
@@ -513,11 +515,10 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSizeCheck() {
-    val first = MemoryRecords.withRecords(CompressionType.NONE,
-                                          Record.create(time.milliseconds, null, "You".getBytes),
-                                          Record.create(time.milliseconds, null, "bethe".getBytes))
+    val first = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
     val second = MemoryRecords.withRecords(CompressionType.NONE,
-                                           Record.create(time.milliseconds, null, "change (I need more bytes)".getBytes))
+      new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes),
+      new SimpleRecord("More padding boo hoo".getBytes))
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
@@ -599,7 +600,7 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
 
     val messages = (0 until numMessages).map { i =>
-      MemoryRecords.withLogEntries(LogEntry.create(100 + i, Record.create(Record.MAGIC_VALUE_V1, time.milliseconds + i, i.toString.getBytes())))
+      MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
     }
     messages.foreach(log.append(_, assignOffsets = false))
     val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
@@ -637,7 +638,7 @@ class LogTest extends JUnitSuite {
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
+      assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -715,7 +716,7 @@ class LogTest extends JUnitSuite {
     log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).records.shallowEntries.iterator.next().offset)
+      assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -786,12 +787,12 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val setSize = TestUtils.singletonRecords(value = "test".getBytes).sizeInBytes
+    val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
     val config = LogConfig(logProps)
     val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
@@ -970,10 +971,10 @@ class LogTest extends JUnitSuite {
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, null)))
-    val head = log.read(0, 4096, None).records.shallowEntries().iterator.next()
+    log.append(TestUtils.singletonRecords(value = null))
+    val head = log.read(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
-    assertTrue("Message payload should be null.", head.record.hasNullValue)
+    assertTrue("Message payload should be null.", !head.hasValue)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -983,10 +984,10 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    val messages = (0 until 2).map(id => Record.create(time.milliseconds, null, id.toString.getBytes)).toArray
-    messages.foreach(record => log.append(MemoryRecords.withRecords(record)))
-    val invalidMessage = MemoryRecords.withRecords(Record.create(1.toString.getBytes))
-    log.append(invalidMessage, assignOffsets = false)
+    val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
+    records.foreach(record => log.append(MemoryRecords.withRecords(CompressionType.NONE, record)))
+    val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
+    log.append(invalidRecord, assignOffsets = false)
   }
 
   @Test
@@ -996,7 +997,8 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    log.append(MemoryRecords.withRecords(Record.create(Record.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.NONE,
+      new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)))
   }
 
   @Test
@@ -1020,7 +1022,7 @@ class LogTest extends JUnitSuite {
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
-      val messages = log.logSegments.flatMap(_.log.deepEntries.asScala.toList)
+      val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList
       log.close()
 
       // corrupt index and log by appending random bytes
@@ -1030,8 +1032,18 @@ class LogTest extends JUnitSuite {
       // attempt recovery
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
-      assertEquals("Messages in the log after recovery should be the same.", messages,
-        log.logSegments.flatMap(_.log.deepEntries.asScala.toList))
+
+      val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
+      assertEquals(records.size, recovered.size)
+
+      for (i <- records.indices) {
+        val expected = records(i)
+        val actual = recovered(i)
+        assertEquals(s"Keys not equal", expected.key, actual.key)
+        assertEquals(s"Values not equal", expected.value, actual.value)
+        assertEquals(s"Timestamps not equal", expected.timestamp, actual.timestamp)
+      }
+
       Utils.delete(logDir)
     }
   }
@@ -1049,10 +1061,10 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    val set1 = MemoryRecords.withRecords(0, Record.create("v1".getBytes(), "k1".getBytes()))
-    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, Record.create("v3".getBytes(), "k3".getBytes()))
-    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, Record.create("v4".getBytes(), "k4".getBytes()))
-    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, Record.create("v5".getBytes(), "k5".getBytes()))
+    val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
+    val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
+    val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
+    val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.NONE, new SimpleRecord("v5".getBytes(), "k5".getBytes()))
     //Writes into an empty log with baseOffset 0
     log.append(set1, false)
     assertEquals(0L, log.activeSegment.baseOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index bb50497..903c394 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -31,102 +31,137 @@ import scala.collection.JavaConverters._
 class LogValidatorTest extends JUnitSuite {
 
   @Test
-  def testLogAppendTimeNonCompressed() {
+  def testLogAppendTimeNonCompressedV1() {
+    checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  private def checkLogAppendTimeNonCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.NONE)
+    val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = magic,
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
-    assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
-    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
+  def testLogAppendTimeNonCompressedV2() {
+    checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+  }
+
   @Test
-  def testLogAppendTimeWithRecompression() {
+  def testLogAppendTimeWithRecompressionV1() {
+    checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  private def checkLogAppendTimeWithRecompression(targetMagic: Byte) {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = targetMagic,
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
-    assertEquals("number of messages should not change", records.deepEntries.asScala.size, validatedRecords.deepEntries.asScala.size)
-    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
-    assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
+    assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+    assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
-      records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
+      records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
   @Test
-  def testLogAppendTimeWithoutRecompression() {
+  def testLogAppendTimeWithRecompressionV2() {
+    checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2)
+  }
+
+  @Test
+  def testLogAppendTimeWithoutRecompressionV1() {
+    checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  private def checkLogAppendTimeWithoutRecompression(magic: Byte) {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = magic,
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
-    assertEquals("number of messages should not change", records.deepEntries.asScala.size,
-      validatedRecords.deepEntries.asScala.size)
-    validatedRecords.deepEntries.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
-    assertTrue("MessageSet should still valid", validatedRecords.shallowEntries.iterator.next().record.isValid)
+    assertEquals("message set size should not change", records.records.asScala.size,
+      validatedRecords.records.asScala.size)
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+    assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${records.deepEntries.asScala.size - 1}",
-      records.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}",
+      records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
   @Test
-  def testCreateTimeNonCompressed() {
+  def testLogAppendTimeWithoutRecompressionV2() {
+    checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2)
+  }
+
+  @Test
+  def testCreateTimeNonCompressedV1() {
+    checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  def checkCreateTimeNonCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records = MemoryRecords.withRecords(CompressionType.NONE,
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+    val records =
+      MemoryRecords.withRecords(magic, CompressionType.NONE,
+        new SimpleRecord(timestampSeq(0), "hello".getBytes),
+        new SimpleRecord(timestampSeq(1), "there".getBytes),
+        new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
 
     val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = magic,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
-    for (logEntry <- validatedRecords.deepEntries.asScala) {
-      assertTrue(logEntry.record.isValid)
-      assertEquals(timestampSeq(i), logEntry.record.timestamp)
-      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
-      i += 1
+    for (batch <- validatedRecords.batches.asScala) {
+      assertTrue(batch.isValid)
+      assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+      for (record <- batch.asScala) {
+        assertTrue(record.isValid)
+        assertEquals(timestampSeq(i), record.timestamp)
+        i += 1
+      }
     }
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
     assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
@@ -134,65 +169,118 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCreateTimeUpConversion() {
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+  def testCreateTimeNonCompressedV2() {
+    checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+  }
+
+  @Test
+  def testCreateTimeUpConversionV0ToV1(): Unit = {
+    checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  private def checkCreateTimeUpConvertionFromV0(toMagic: Byte) {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
-        offsetCounter = new LongRef(0),
-        now = System.currentTimeMillis(),
-        sourceCodec = DefaultCompressionCodec,
-        targetCodec = DefaultCompressionCodec,
-        messageFormatVersion = Record.MAGIC_VALUE_V1,
-        messageTimestampType = TimestampType.CREATE_TIME,
-        messageTimestampDiffMaxMs = 1000L)
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = toMagic,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
-    for (logEntry <- validatedRecords.deepEntries.asScala) {
-      assertTrue(logEntry.record.isValid)
-      assertEquals(Record.NO_TIMESTAMP, logEntry.record.timestamp)
-      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
+    for (batch <- validatedRecords.batches.asScala) {
+      assertTrue(batch.isValid)
+      assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp)
+      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
     }
-    assertEquals(s"Max timestamp should be ${Record.NO_TIMESTAMP}", Record.NO_TIMESTAMP, validatedResults.maxTimestamp)
-    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
-      validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}", RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+      validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
   @Test
-  def testCreateTimeCompressed() {
+  def testCreateTimeUpConversionV0ToV2() {
+    checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V2)
+  }
+
+  @Test
+  def testCreateTimeUpConversionV1ToV2() {
+    val timestamp = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp)
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(0),
+      now = timestamp,
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+
+    for (batch <- validatedRecords.batches.asScala) {
+      assertTrue(batch.isValid)
+      assertEquals(timestamp, batch.maxTimestamp)
+      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+    }
+    assertEquals(timestamp, validatedResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+      validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testCreateTimeCompressedV1() {
+    createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  def createCreateTimeCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records = MemoryRecords.withRecords(CompressionType.GZIP,
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+    val records =
+      MemoryRecords.withRecords(magic, CompressionType.GZIP,
+        new SimpleRecord(timestampSeq(0), "hello".getBytes),
+        new SimpleRecord(timestampSeq(1), "there".getBytes),
+        new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
 
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
         offsetCounter = new LongRef(0),
         now = System.currentTimeMillis(),
         sourceCodec = DefaultCompressionCodec,
         targetCodec = DefaultCompressionCodec,
-        messageFormatVersion = Record.MAGIC_VALUE_V1,
+        messageFormatVersion = magic,
         messageTimestampType = TimestampType.CREATE_TIME,
         messageTimestampDiffMaxMs = 1000L)
     val validatedRecords = validatedResults.validatedRecords
 
     var i = 0
-    for (logEntry <- validatedRecords.deepEntries.asScala) {
-      assertTrue(logEntry.record.isValid)
-      assertEquals(timestampSeq(i), logEntry.record.timestamp)
-      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
-      i += 1
+    for (batch <- validatedRecords.batches.asScala) {
+      assertTrue(batch.isValid)
+      assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+      for (record <- batch.asScala) {
+        assertTrue(record.isValid)
+        assertEquals(timestampSeq(i), record.timestamp)
+        i += 1
+      }
     }
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)
-    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size - 1}",
-      validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}",
+      validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
   }
 
+  @Test
+  def testCreateTimeCompressedV2() {
+    createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V2)
+  }
+
   @Test(expected = classOf[InvalidTimestampException])
-  def testInvalidCreateTimeNonCompressed() {
+  def testInvalidCreateTimeNonCompressedV1() {
     val now = System.currentTimeMillis()
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
       codec = CompressionType.NONE)
     LogValidator.validateMessagesAndAssignOffsets(
       records,
@@ -200,15 +288,31 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 1000L)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
-  def testInvalidCreateTimeCompressed() {
+  def testInvalidCreateTimeNonCompressedV2() {
     val now = System.currentTimeMillis()
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
+      codec = CompressionType.NONE)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+  }
+
+  @Test(expected = classOf[InvalidTimestampException])
+  def testInvalidCreateTimeCompressedV1() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
       codec = CompressionType.GZIP)
     LogValidator.validateMessagesAndAssignOffsets(
       records,
@@ -216,13 +320,30 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 1000L)
   }
+
+  @Test(expected = classOf[InvalidTimestampException])
+  def testInvalidCreateTimeCompressedV2() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
+      codec = CompressionType.GZIP)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+  }
+
   @Test
   def testAbsoluteOffsetAssignmentNonCompressed() {
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
     val offset = 1234567
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -230,14 +351,14 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
   }
 
   @Test
   def testAbsoluteOffsetAssignmentCompressed() {
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -245,15 +366,15 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
   }
 
   @Test
-  def testRelativeOffsetAssignmentNonCompressed() {
+  def testRelativeOffsetAssignmentNonCompressedV1() {
     val now = System.currentTimeMillis()
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE)
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE)
     val offset = 1234567
     checkOffsets(records, 0)
     val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
@@ -261,15 +382,33 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 5000L).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
   @Test
-  def testRelativeOffsetAssignmentCompressed() {
+  def testRelativeOffsetAssignmentNonCompressedV2() {
     val now = System.currentTimeMillis()
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = CompressionType.NONE)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords
+    checkOffsets(messageWithOffset, offset)
+  }
+
+  @Test
+  def testRelativeOffsetAssignmentCompressedV1() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
     val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
@@ -278,14 +417,125 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 5000L).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
   @Test
-  def testOffsetAssignmentAfterMessageFormatConversionV0NonCompressed() {
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+  def testRelativeOffsetAssignmentCompressedV2() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords
+    checkOffsets(compressedMessagesWithOffset, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterUpConversionV0ToV1NonCompressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    val offset = 1234567
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterUpConversionV0ToV2NonCompressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    val offset = 1234567
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterUpConversionV0ToV1Compressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterUpConversionV0ToV2Compressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterDownConversionV1ToV0NonCompressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterDownConversionV1ToV0Compressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V1, now, CompressionType.GZIP)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterUpConversionV1ToV2NonCompressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -293,14 +543,14 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
   }
 
   @Test
-  def testOffsetAssignmentAfterMessageFormatConversionV0Compressed() {
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+  def testOffsetAssignmentAfterUpConversionV1ToV2Compressed() {
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
@@ -308,39 +558,71 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
       messageTimestampType = TimestampType.LOG_APPEND_TIME,
       messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
   }
 
   @Test
-  def testOffsetAssignmentAfterMessageFormatConversionV1NonCompressed() {
+  def testOffsetAssignmentAfterDownConversionV2ToV1NonCompressed() {
     val offset = 1234567
     val now = System.currentTimeMillis()
-    val records = createRecords(Record.MAGIC_VALUE_V1, now, codec = CompressionType.NONE)
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
   }
 
   @Test
-  def testOffsetAssignmentAfterMessageFormatConversionV1Compressed() {
+  def testOffsetAssignmentAfterDownConversionV2ToV1Compressed() {
     val offset = 1234567
     val now = System.currentTimeMillis()
-    val records = createRecords(Record.MAGIC_VALUE_V1, now, CompressionType.GZIP)
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP)
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterDownConversionV2ToV0Compressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
   }
@@ -354,33 +636,27 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = SnappyCompressionCodec,
       targetCodec = SnappyCompressionCodec,
+      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
       messageTimestampType = TimestampType.CREATE_TIME,
       messageTimestampDiffMaxMs = 5000L)
   }
 
   private def createRecords(magicValue: Byte = Message.CurrentMagicValue,
-                              timestamp: Long = Message.NoTimestamp,
-                              codec: CompressionType = CompressionType.NONE): MemoryRecords = {
-    if (magicValue == Record.MAGIC_VALUE_V0) {
-      MemoryRecords.withRecords(
-        codec,
-        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "hello".getBytes),
-        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "there".getBytes),
-        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "beautiful".getBytes))
-    } else {
-      MemoryRecords.withRecords(
-        codec,
-        Record.create(Record.MAGIC_VALUE_V1, timestamp, "hello".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestamp, "there".getBytes),
-        Record.create(Record.MAGIC_VALUE_V1, timestamp, "beautiful".getBytes))
-    }
+                            timestamp: Long = Message.NoTimestamp,
+                            codec: CompressionType = CompressionType.NONE): MemoryRecords = {
+    val buf = ByteBuffer.allocate(512)
+    val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+    builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
+    builder.appendWithOffset(1, timestamp, null, "there".getBytes)
+    builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes)
+    builder.build()
   }
 
   /* check that offsets are assigned consecutively from the given base offset */
-  private def checkOffsets(records: MemoryRecords, baseOffset: Long) {
-    assertTrue("Message set should not be empty", records.deepEntries.asScala.nonEmpty)
+  def checkOffsets(records: MemoryRecords, baseOffset: Long) {
+    assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)
     var offset = baseOffset
-    for (entry <- records.deepEntries.asScala) {
+    for (entry <- records.records.asScala) {
       assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
       offset += 1
     }
@@ -388,28 +664,32 @@ class LogValidatorTest extends JUnitSuite {
 
   private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = {
     val records = (0 until 20).map(id =>
-      Record.create(Record.MAGIC_VALUE_V0,
-        Record.NO_TIMESTAMP,
+      LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0,
+        RecordBatch.NO_TIMESTAMP,
         id.toString.getBytes,
         id.toString.getBytes))
 
     val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
-    val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.GZIP,
-      TimestampType.CREATE_TIME)
+    val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP,
+      TimestampType.CREATE_TIME, 0L)
 
     var offset = initialOffset
     records.foreach { record =>
-      builder.appendUnchecked(offset, record)
+      builder.appendUncheckedWithOffset(offset, record)
       offset += 1
     }
 
     builder.build()
   }
 
-  def validateLogAppendTime(now: Long, record: Record) {
-    record.ensureValid()
-    assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
-    assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType)
+  def validateLogAppendTime(now: Long, batch: RecordBatch) {
+    assertTrue(batch.isValid)
+    assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME)
+    assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp)
+    for (record <- batch.asScala) {
+      assertTrue(record.isValid)
+      assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index b7d2fa1..40581ed 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -124,7 +124,8 @@ trait BaseMessageSetTestCases extends JUnitSuite {
         val written = write(channel)
         assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
         val fileRecords = new FileRecords(file, channel, 0, Integer.MAX_VALUE, false)
-        assertEquals(set.asRecords.deepEntries.asScala.toVector, fileRecords.deepEntries.asScala.toVector)
+        assertEquals(set.asRecords.records.asScala.toVector, fileRecords.records.asScala.toVector)
+        checkEquals(set.asRecords.records.iterator, fileRecords.records.iterator)
       } finally channel.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 17056c9..ed873d0 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, NetworkSend}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -117,7 +117,8 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = 10000
     val ack = 0: Short
 
-    val emptyRequest = new ProduceRequest.Builder(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+    val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+      new HashMap[TopicPartition, MemoryRecords]()).build()
     val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
     val byteBuffer = emptyRequest.serialize(emptyHeader)
     byteBuffer.rewind()
@@ -287,7 +288,8 @@ class SocketServerTest extends JUnitSuite {
       val clientId = ""
       val ackTimeoutMs = 10000
       val ack = 0: Short
-      val emptyRequest = new ProduceRequest.Builder(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build()
+      val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+        new HashMap[TopicPartition, MemoryRecords]()).build()
       val emptyHeader = new RequestHeader(apiKey, emptyRequest.version, clientId, correlationId)
 
       val byteBuffer = emptyRequest.serialize(emptyHeader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 189e21b..41a8a6c 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -29,6 +29,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.record.{DefaultRecordBatch, DefaultRecord}
 import org.junit.Test
 import org.junit.Assert._
 
@@ -119,7 +120,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertEquals(Errors.MESSAGE_TOO_LARGE, response1.status(TopicAndPartition("test", 0)).error)
     assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
-    val safeSize = configs.head.messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
+    val safeSize = configs.head.messageMaxBytes - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD
     val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1))
@@ -129,7 +130,6 @@ class SyncProducerTest extends KafkaServerTestHarness {
     assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
-
   @Test
   def testMessageSizeTooLargeWithAckZero() {
     val server = servers.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index a5d8102..581a917 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -23,8 +23,7 @@ import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.ByteUtils
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
@@ -156,8 +155,8 @@ class AbstractFetcherThreadTest {
     @volatile var fetchCount = 0
 
     private val normalPartitionDataSet = List(
-      new TestPartitionData(MemoryRecords.withRecords(0L, Record.create("hello".getBytes()))),
-      new TestPartitionData(MemoryRecords.withRecords(1L, Record.create("hello".getBytes())))
+      new TestPartitionData(MemoryRecords.withRecords(0L, CompressionType.NONE, new SimpleRecord("hello".getBytes()))),
+      new TestPartitionData(MemoryRecords.withRecords(1L, CompressionType.NONE, new SimpleRecord("hello".getBytes())))
     )
 
     override def processPartitionData(topicPartition: TopicPartition,
@@ -171,9 +170,9 @@ class AbstractFetcherThreadTest {
 
       // Now check message's crc
       val records = partitionData.toRecords
-      for (entry <- records.shallowEntries.asScala) {
-        entry.record.ensureValid()
-        logEndOffset = entry.nextOffset
+      for (batch <- records.batches.asScala) {
+        batch.ensureValid()
+        logEndOffset = batch.nextOffset
       }
     }
 
@@ -181,15 +180,18 @@ class AbstractFetcherThreadTest {
       fetchCount += 1
       // Set the first fetch to get a corrupted message
       if (fetchCount == 1) {
-        val corruptedRecord = Record.create("hello".getBytes())
-        val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt
-        // Garble checksum
-        ByteUtils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
-        val records = MemoryRecords.withRecords(corruptedRecord)
+        val record = new SimpleRecord("hello".getBytes())
+        val records = MemoryRecords.withRecords(CompressionType.NONE, record)
+        val buffer = records.buffer
+
+        // flip some bits in the message to ensure the crc fails
+        buffer.putInt(15, buffer.getInt(15) ^ 23422)
+        buffer.putInt(30, buffer.getInt(30) ^ 93242)
         fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
-      } else
-      // Then, the following fetches get the normal data
+      } else {
+        // Then, the following fetches get the normal data
         fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
+      }
     }
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 2d4a22a..0deb26d 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.types.Type
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, ResponseHeader}
 import org.junit.Assert._
 import org.junit.Test
@@ -116,10 +116,11 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
     val version = 2: Short
     val serializedBytes = {
-      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, correlationId)
-      val messageBytes = "message".getBytes
-      val records = MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes))
-      val request = new ProduceRequest.Builder(1, 10000, Map(topicPartition -> records).asJava).build()
+      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.latestVersion, null,
+        correlationId)
+      val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
+      val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000,
+        Map(topicPartition -> records).asJava).build()
       val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf)
       byteBuffer.put(headerBytes)
       request.toStruct.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 5616956..b350732 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.LogEntry
+import org.apache.kafka.common.record.Record
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.Assert._
@@ -53,7 +53,6 @@ class FetchRequestTest extends BaseRequestTest {
     super.tearDown()
   }
 
-
   private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
                                  offsetMap: Map[TopicPartition, Long] = Map.empty): FetchRequest =
     FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
@@ -68,9 +67,10 @@ class FetchRequestTest extends BaseRequestTest {
     partitionMap
   }
 
-  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest,
+                               version: Short = ApiKeys.FETCH.latestVersion): FetchResponse = {
     val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
-    FetchResponse.parse(response, ApiKeys.FETCH.latestVersion)
+    FetchResponse.parse(response, version)
   }
 
   @Test
@@ -121,13 +121,13 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
     assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq)
     val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData =>
-      logEntries(partitionData).map(_.sizeInBytes).sum
+      records(partitionData).map(_.sizeInBytes).sum
     }.sum
     assertTrue(responseSize3 <= maxResponseBytes)
     val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1)
     assertEquals(Errors.NONE, partitionData3.error)
     assertTrue(partitionData3.highWatermark > 0)
-    val size3 = logEntries(partitionData3).map(_.sizeInBytes).sum
+    val size3 = records(partitionData3).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes)
     assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 > maxPartitionBytes)
     assertTrue(maxPartitionBytes < partitionData3.records.sizeInBytes)
@@ -139,13 +139,13 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
     assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq)
     val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect {
-      case (tp, partitionData) if logEntries(partitionData).map(_.sizeInBytes).sum > 0 => tp
+      case (tp, partitionData) if records(partitionData).map(_.sizeInBytes).sum > 0 => tp
     }
     assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
     val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2)
     assertEquals(Errors.NONE, partitionData4.error)
     assertTrue(partitionData4.highWatermark > 0)
-    val size4 = logEntries(partitionData4).map(_.sizeInBytes).sum
+    val size4 = records(partitionData4).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes)
     assertTrue(maxResponseBytes < partitionData4.records.sizeInBytes)
   }
@@ -158,16 +158,16 @@ class FetchRequestTest extends BaseRequestTest {
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
     val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
       createPartitionMap(maxPartitionBytes, Seq(topicPartition))).build(2)
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest, version = 2)
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE, partitionData.error)
     assertTrue(partitionData.highWatermark > 0)
     assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes)
-    assertEquals(0, logEntries(partitionData).map(_.sizeInBytes).sum)
+    assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
   }
 
-  private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
-    partitionData.records.deepEntries.asScala.toIndexedSeq
+  private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = {
+    partitionData.records.records.asScala.toIndexedSeq
   }
 
   private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse: FetchResponse,
@@ -185,23 +185,22 @@ class FetchRequestTest extends BaseRequestTest {
       val records = partitionData.records
       responseBufferSize += records.sizeInBytes
 
-      val entries = records.shallowEntries.asScala.toIndexedSeq
-      assertTrue(entries.size < numMessagesPerPartition)
-      val entriesSize = entries.map(_.sizeInBytes).sum
-      responseSize += entriesSize
-      if (entriesSize == 0 && !emptyResponseSeen) {
+      val batches = records.batches.asScala.toIndexedSeq
+      assertTrue(batches.size < numMessagesPerPartition)
+      val batchesSize = batches.map(_.sizeInBytes).sum
+      responseSize += batchesSize
+      if (batchesSize == 0 && !emptyResponseSeen) {
         assertEquals(0, records.sizeInBytes)
         emptyResponseSeen = true
       }
-      else if (entriesSize != 0 && !emptyResponseSeen) {
-        assertTrue(entriesSize <= maxPartitionBytes)
+      else if (batchesSize != 0 && !emptyResponseSeen) {
+        assertTrue(batchesSize <= maxPartitionBytes)
         assertEquals(maxPartitionBytes, records.sizeInBytes)
       }
-      else if (entriesSize != 0 && emptyResponseSeen)
-        fail(s"Expected partition with size 0, but found $tp with size $entriesSize")
+      else if (batchesSize != 0 && emptyResponseSeen)
+        fail(s"Expected partition with size 0, but found $tp with size $batchesSize")
       else if (records.sizeInBytes != 0 && emptyResponseSeen)
         fail(s"Expected partition buffer with size 0, but found $tp with size ${records.sizeInBytes}")
-
     }
 
     assertEquals(maxResponseBytes - maxResponseBytes % maxPartitionBytes, responseBufferSize)
@@ -227,7 +226,7 @@ class FetchRequestTest extends BaseRequestTest {
       val suffix = s"$tp-$messageIndex"
       new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix")
     }
-    records.map(producer.send).foreach(_.get)
+    records.map(producer.send(_).get)
     records
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index ac766dc..9386a1d 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -31,7 +31,6 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
@@ -42,7 +41,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
-  var logSize: Int = 100
+  var logSize: Int = 140
   var simpleConsumer: SimpleConsumer = null
   var time: Time = new MockTime()
 
@@ -90,9 +89,8 @@ class LogOffsetTest extends ZooKeeperTestHarness {
                   "Log for partition [topic,0] should be created")
     val log = logManager.getLog(new TopicPartition(topic, part)).get
 
-    val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
-      log.append(MemoryRecords.withRecords(record))
+      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
@@ -151,9 +149,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val logManager = server.getLogManager
     val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
-    val record = Record.create(Integer.toString(42).getBytes())
+
     for (_ <- 0 until 20)
-      log.append(MemoryRecords.withRecords(record))
+      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
     log.flush()
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
@@ -180,9 +178,8 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val logManager = server.getLogManager
     val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
-    val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
-      log.append(MemoryRecords.withRecords(record))
+      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 81118fa..2f16719 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.junit.Assert._
 import org.junit.Test
@@ -41,7 +41,7 @@ class ProduceRequestTest extends BaseRequestTest {
       val topicPartition = new TopicPartition("topic", partition)
       val partitionRecords = Map(topicPartition -> memoryRecords)
       val produceResponse = sendProduceRequest(leader,
-          new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
+          new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
       assertEquals(1, produceResponse.responses.size)
       val (tp, partitionResponse) = produceResponse.responses.asScala.head
       assertEquals(topicPartition, tp)
@@ -51,12 +51,12 @@ class ProduceRequestTest extends BaseRequestTest {
       partitionResponse
     }
 
-    sendAndCheck(MemoryRecords.withRecords(
-      Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
+    sendAndCheck(MemoryRecords.withRecords(CompressionType.NONE,
+      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
 
     sendAndCheck(MemoryRecords.withRecords(CompressionType.GZIP,
-      Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
-      Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
+      new SimpleRecord(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
+      new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
   }
 
   /* returns a pair of partition id and leader id */
@@ -72,13 +72,14 @@ class ProduceRequestTest extends BaseRequestTest {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
     val timestamp = 1000000
     val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
-      Record.create(timestamp, "key".getBytes, "value".getBytes))
-    // Change the lz4 checksum value so that it doesn't match the contents
-    memoryRecords.buffer.array.update(40, 0)
+      new SimpleRecord(timestamp, "key".getBytes, "value".getBytes))
+    // Change the lz4 checksum value (not the kafka record crc) so that it doesn't match the contents
+    val lz4ChecksumOffset = 6
+    memoryRecords.buffer.array.update(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + lz4ChecksumOffset, 0)
     val topicPartition = new TopicPartition("topic", partition)
     val partitionRecords = Map(topicPartition -> memoryRecords)
     val produceResponse = sendProduceRequest(leader, 
-      new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
+      new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head
     assertEquals(topicPartition, tp)