You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:56 UTC

[03/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8315e64..038c15b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -27,7 +27,7 @@ import kafka.serializer.Decoder
 import kafka.utils._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.record.{CompressionType, FileRecords, LogEntry, Record}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
@@ -131,7 +131,7 @@ object DumpLogSegments {
                         verifyOnly: Boolean,
                         misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
                         maxMessageSize: Int) {
-    val startOffset = file.getName().split("\\.")(0).toLong
+    val startOffset = file.getName.split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
     val fileRecords = FileRecords.open(logFile, false)
     val index = new OffsetIndex(file, baseOffset = startOffset)
@@ -146,10 +146,10 @@ object DumpLogSegments {
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       val slice = fileRecords.read(entry.position, maxMessageSize)
-      val logEntry = getIterator(slice.shallowEntries.iterator.next(), isDeepIteration = true).next()
-      if (logEntry.offset != entry.offset + index.baseOffset) {
+      val firstRecord = slice.records.iterator.next()
+      if (firstRecord.offset != entry.offset + index.baseOffset) {
         var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
-        misMatchesSeq ::=(entry.offset + index.baseOffset, logEntry.offset)
+        misMatchesSeq ::=(entry.offset + index.baseOffset, firstRecord.offset)
         misMatchesForIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
       }
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
@@ -179,26 +179,23 @@ object DumpLogSegments {
       return
     }
 
-    var prevTimestamp = Record.NO_TIMESTAMP
+    var prevTimestamp = RecordBatch.NO_TIMESTAMP
     for(i <- 0 until timeIndex.entries) {
       val entry = timeIndex.entry(i)
       val position = index.lookup(entry.offset + timeIndex.baseOffset).position
       val partialFileRecords = fileRecords.read(position, Int.MaxValue)
-      val shallowEntries = partialFileRecords.shallowEntries.asScala
-      var maxTimestamp = Record.NO_TIMESTAMP
+      val batches = partialFileRecords.batches.asScala
+      var maxTimestamp = RecordBatch.NO_TIMESTAMP
       // We first find the message by offset then check if the timestamp is correct.
-      val maybeLogEntry = shallowEntries.find(_.offset >= entry.offset + timeIndex.baseOffset)
-      maybeLogEntry match {
+      batches.find(_.lastOffset >= entry.offset + timeIndex.baseOffset) match {
         case None =>
           timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
             -1.toLong)
-        case Some(logEntry) if logEntry.offset != entry.offset + timeIndex.baseOffset =>
-          timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
-            logEntry.offset)
-        case Some(shallowLogEntry) =>
-          val deepIter = getIterator(shallowLogEntry, isDeepIteration = true)
-          for (deepLogEntry <- deepIter)
-            maxTimestamp = math.max(maxTimestamp, deepLogEntry.record.timestamp)
+        case Some(batch) if batch.lastOffset != entry.offset + timeIndex.baseOffset =>
+          timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, batch.lastOffset)
+        case Some(batch) =>
+          for (record <- batch.asScala)
+            maxTimestamp = math.max(maxTimestamp, record.timestamp)
 
           if (maxTimestamp != entry.timestamp)
             timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp)
@@ -222,7 +219,7 @@ object DumpLogSegments {
 
   private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], valueDecoder: Decoder[V]) extends MessageParser[K, V] {
     override def parse(record: Record): (Option[K], Option[V]) = {
-      if (record.hasNullValue) {
+      if (!record.hasValue) {
         (None, None)
       } else {
         val key = if (record.hasKey)
@@ -285,7 +282,7 @@ object DumpLogSegments {
     }
 
     override def parse(record: Record): (Option[String], Option[String]) = {
-      if (record.hasNullValue)
+      if (!record.hasValue)
         (None, None)
       else if (!record.hasKey) {
         throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)")
@@ -306,54 +303,65 @@ object DumpLogSegments {
                       isDeepIteration: Boolean,
                       maxMessageSize: Int,
                       parser: MessageParser[_, _]) {
-    val startOffset = file.getName().split("\\.")(0).toLong
+    val startOffset = file.getName.split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = FileRecords.open(file, false)
     var validBytes = 0L
-    var lastOffset = -1l
-    for (shallowLogEntry <- messageSet.shallowEntries(maxMessageSize).asScala) {
-      val itr = getIterator(shallowLogEntry, isDeepIteration)
-      for (deepLogEntry <- itr) {
-        val record = deepLogEntry.record()
-
-        if(lastOffset == -1)
-          lastOffset = deepLogEntry.offset
-        // If we are iterating uncompressed messages, offsets must be consecutive
-        else if (record.compressionType == CompressionType.NONE && deepLogEntry.offset != lastOffset +1) {
-          var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
-          nonConsecutivePairsSeq ::=(lastOffset, deepLogEntry.offset)
-          nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
+    var lastOffset = -1L
+    val batches = messageSet.batches(maxMessageSize).asScala
+    for (batch <- batches) {
+      if (isDeepIteration) {
+        for (record <- batch.asScala) {
+          if (lastOffset == -1)
+            lastOffset = record.offset
+          else if (record.offset != lastOffset + 1) {
+            var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getAbsolutePath, List[(Long, Long)]())
+            nonConsecutivePairsSeq ::= (lastOffset, record.offset)
+            nonConsecutivePairsForLogFilesMap.put(file.getAbsolutePath, nonConsecutivePairsSeq)
+          }
+          lastOffset = record.offset
+
+          print("offset: " + record.offset + " position: " + validBytes +
+            " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
+            " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
+            " compresscodec: " + batch.compressionType + " crc: " + record.checksum)
+
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+            print(" sequence: " + record.sequence +
+              " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
+          }
+
+          if (record.isControlRecord) {
+            val controlType = ControlRecordType.parse(record.key)
+            print(s" controlType: $controlType")
+          } else if (printContents) {
+            val (key, payload) = parser.parse(record)
+            key.foreach(key => print(s" key: $key"))
+            payload.foreach(payload => print(s" payload: $payload"))
+          }
+          println()
         }
-        lastOffset = deepLogEntry.offset
-
-        print("offset: " + deepLogEntry.offset + " position: " + validBytes +
-              " " + record.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
-              " payloadsize: " + record.valueSize + " magic: " + record.magic +
-              " compresscodec: " + record.compressionType + " crc: " + record.checksum)
-        if (record.hasKey)
-          print(" keysize: " + record.keySize)
-        if (printContents) {
-          val (key, payload) = parser.parse(record)
-          key.foreach(key => print(s" key: $key"))
-          payload.foreach(payload => print(s" payload: $payload"))
-        }
-        println()
-      }
+      } else {
+        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
+            " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
+            " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional)
+        else
+          print("offset: " + batch.lastOffset)
 
-      validBytes += shallowLogEntry.sizeInBytes
+        println(" position: " + validBytes + " " + batch.timestampType + ": " + batch.maxTimestamp +
+          " isvalid: " + batch.isValid +
+          " size: " + batch.sizeInBytes + " magic: " + batch.magic +
+          " compresscodec: " + batch.compressionType + " crc: " + batch.checksum)
+      }
+      validBytes += batch.sizeInBytes
     }
     val trailingBytes = messageSet.sizeInBytes - validBytes
     if(trailingBytes > 0)
       println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
   }
 
-  private def getIterator(logEntry: LogEntry, isDeepIteration: Boolean): Iterator[LogEntry] = {
-    if (isDeepIteration)
-      logEntry.iterator.asScala
-    else
-      Iterator(logEntry)
-  }
-
   class TimeIndexDumpErrors {
     val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
     val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index a2866ad..d359c1a 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -38,12 +38,12 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
-import org.apache.kafka.common.record.Record
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
 import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
+import org.apache.kafka.common.record.RecordBatch
 
 /**
  * The mirror maker has the following architecture:
@@ -752,7 +752,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
     override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
-      val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp
+      val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
       Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 492b304..c9c2e44 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -34,9 +34,6 @@ import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.JavaConverters._
-
-
 /**
  *  For verifying the consistency among replicas.
  *
@@ -274,8 +271,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
             "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
             + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
-      val logEntryIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
-        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.shallowEntries.iterator
+      val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
+        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.batches.iterator
       }
       val maxHw = fetchResponsePerReplica.values.map(_.hw).max
 
@@ -283,32 +280,32 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       var isMessageInAllReplicas = true
       while (isMessageInAllReplicas) {
         var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
-        for ((replicaId, logEntriesIterator) <- logEntryIteratorMap) {
+        for ((replicaId, recordBatchIterator) <- recordBatchIteratorMap) {
           try {
-            if (logEntriesIterator.hasNext) {
-              val logEntry = logEntriesIterator.next()
+            if (recordBatchIterator.hasNext) {
+              val batch = recordBatchIterator.next()
 
               // only verify up to the high watermark
-              if (logEntry.offset >= fetchResponsePerReplica.get(replicaId).hw)
+              if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).hw)
                 isMessageInAllReplicas = false
               else {
                 messageInfoFromFirstReplicaOpt match {
                   case None =>
                     messageInfoFromFirstReplicaOpt = Some(
-                      MessageInfo(replicaId, logEntry.offset, logEntry.nextOffset, logEntry.record.checksum))
+                      MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
                   case Some(messageInfoFromFirstReplica) =>
-                    if (messageInfoFromFirstReplica.offset != logEntry.offset) {
+                    if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
                       println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition
                         + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
                         + messageInfoFromFirstReplica.offset + " doesn't match replica "
-                        + replicaId + "'s offset " + logEntry.offset)
+                        + replicaId + "'s offset " + batch.lastOffset)
                       Exit.exit(1)
                     }
-                    if (messageInfoFromFirstReplica.checksum != logEntry.record.checksum)
+                    if (messageInfoFromFirstReplica.checksum != batch.checksum)
                       println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
-                        + topicAndPartition + " has unmatched checksum at offset " + logEntry.offset + "; replica "
+                        + topicAndPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
                         + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
-                        + "; replica " + replicaId + "'s checksum " + logEntry.record.checksum)
+                        + "; replica " + replicaId + "'s checksum " + batch.checksum)
                 }
               }
             } else

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 469fbff..35bdded 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -25,8 +25,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 import org.junit.Test
 
 class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
@@ -45,6 +44,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
     val nodeApiVersions = NodeApiVersions.create
     for (apiKey <- ApiKeys.values) {
       val apiVersion = nodeApiVersions.apiVersion(apiKey)
+      assertNotNull(apiVersion)
       val versionRangeStr =
         if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
         else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 22efa1f..cb4c235 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
 import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords}
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -183,8 +183,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createProduceRequest = {
-    new requests.ProduceRequest.Builder(1, 5000,
-      collection.mutable.Map(tp -> MemoryRecords.readableRecords(ByteBuffer.wrap("test".getBytes))).asJava).
+    new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+      collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava).
       build()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 083dbf0..09b715d 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -51,12 +51,12 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
     def getGroupMetadataLogOpt: Option[Log] =
       logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0))
 
-    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.shallowEntries.asScala.nonEmpty)),
+    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.batches.asScala.nonEmpty)),
                             "Commit message not appended in time")
 
     val logSegments = getGroupMetadataLogOpt.get.logSegments
     val incorrectCompressionCodecs = logSegments
-      .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType))
+      .flatMap(_.log.batches.asScala.map(_.compressionType))
       .filter(_ != offsetsTopicCompressionCodec)
     assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 50941ce..9ec544c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -648,8 +648,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       "value".getBytes)
     val largeRecord = new ProducerRecord(tp.topic(), tp.partition(), "large".getBytes,
       new Array[Byte](largeProducerRecordSize))
-    this.producers.head.send(smallRecord)
-    this.producers.head.send(largeRecord)
+
+    this.producers.head.send(smallRecord).get
+    this.producers.head.send(largeRecord).get
 
     // we should only get the small record in the first `poll`
     consumer0.assign(List(tp).asJava)
@@ -979,6 +980,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(s"value will not be modified", new String(record.value()))
   }
 
+  @Test
   def testConsumeMessagesWithCreateTime() {
     val numRecords = 50
     // Test non-compressed messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index ce81cae..7372ac4 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -120,7 +121,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic10, servers.size, numServers, servers, topicConfig)
 
     // send a record that is too large for replication, but within the broker max message limit
-    val record = new ProducerRecord(topic10, null, "key".getBytes, new Array[Byte](maxMessageSize - 50))
+    val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic10, null, value)
     val recordMetadata = producer3.send(record).get
 
     assertEquals(topic10, recordMetadata.topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 853dad6..7870485 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -91,7 +91,8 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
         override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
         override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
           fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, -1, null)))
+            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
+              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, null)))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index 4788b4a..c0f385f 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -21,7 +21,7 @@ import kafka.api.FetchResponsePartitionData
 import kafka.common.TopicAndPartition
 import kafka.message.ByteBufferMessageSet
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
 import org.junit.Test
 import org.junit.Assert.assertTrue
 
@@ -41,10 +41,10 @@ class ReplicaVerificationToolTest {
     expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
       (0 until numReplicas).foreach { replicaId =>
         val records = (0 to 5).map { index =>
-          Record.create(s"key $index".getBytes, s"value $index".getBytes)
+          new SimpleRecord(s"key $index".getBytes, s"value $index".getBytes)
         }
         val initialOffset = 4
-        val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*)
+        val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
         replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20,
           new ByteBufferMessageSet(memoryRecords.buffer)))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 5d889d5..ca03ba8 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -140,13 +140,13 @@ object TestLogCleaning {
     require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
     for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
       val fileRecords = FileRecords.open(new File(dir, file))
-      for (entry <- fileRecords.shallowEntries.asScala) {
-        val key = TestUtils.readString(entry.record.key)
-        val content = 
-          if(entry.record.hasNullValue)
+      for (entry <- fileRecords.records.asScala) {
+        val key = TestUtils.readString(entry.key)
+        val content =
+          if (!entry.hasValue)
             null
           else
-            TestUtils.readString(entry.record.value)
+            TestUtils.readString(entry.value)
         println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index a67c166..aff3b2f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -98,8 +98,8 @@ object StressTestLog {
       try {
         log.read(offset, 1024, Some(offset+1)).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
-            val first = read.shallowEntries.iterator.next()
-            require(first.offset == offset, "We should either read nothing or the message we asked for.")
+            val first = read.batches.iterator.next()
+            require(first.lastOffset == offset, "We should either read nothing or the message we asked for.")
             require(first.sizeInBytes == read.sizeInBytes, "Expected %d but got %d.".format(first.sizeInBytes, read.sizeInBytes))
             offset += 1
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 90236bb..9c29679 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -26,7 +26,7 @@ import joptsimple._
 import kafka.log._
 import kafka.message._
 import kafka.utils._
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.math._
@@ -102,8 +102,12 @@ object TestLinearWriteSpeed {
     val rand = new Random
     rand.nextBytes(buffer.array)
     val numMessages = bufferSize / (messageSize + MessageSet.LogOverhead)
-    val messageSet = MemoryRecords.withRecords(CompressionType.forId(compressionCodec.codec),
-      (0 until numMessages).map(_ => Record.create(new Array[Byte](messageSize))): _*)
+    val createTime = System.currentTimeMillis
+    val messageSet = {
+      val compressionType = CompressionType.forId(compressionCodec.codec)
+      val records = (0 until numMessages).map(_ => new SimpleRecord(createTime, null, new Array[Byte](messageSize)))
+      MemoryRecords.withRecords(compressionType, records: _*)
+    }
 
     val writables = new Array[Writable](numFiles)
     val scheduler = new KafkaScheduler(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 22cb899..8a198eb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,20 +17,21 @@
 
 package kafka.coordinator
 
-import kafka.utils.timer.MockTimer
-import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
-import org.junit.Assert._
+import java.util.concurrent.TimeUnit
+
 import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
 import kafka.utils._
+import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
 import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
-import java.util.concurrent.TimeUnit
 
 import scala.collection._
 import scala.concurrent.duration.Duration
@@ -305,7 +306,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     timer.advanceClock(DefaultSessionTimeout + 100)
@@ -1051,10 +1052,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
@@ -1132,10 +1133,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP)
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
@@ -1146,7 +1147,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
     groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 8cfae8d..6b1abf3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -24,19 +24,19 @@ import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
+import kafka.utils.TestUtils.fail
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, TimestampType}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{Before, Test}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import kafka.utils.TestUtils.fail
+import org.junit.{Before, Test}
 
+import scala.collection.JavaConverters._
 import scala.collection._
-import JavaConverters._
 
 class GroupMetadataManagerTest {
 
@@ -94,7 +94,7 @@ class GroupMetadataManagerTest {
     )
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords: _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords: _*)
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
     EasyMock.replay(replicaManager)
@@ -123,8 +123,9 @@ class GroupMetadataManagerTest {
     )
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val tombstone = Record.create(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
-    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(tombstone): _*)
+    val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(tombstone): _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -157,7 +158,8 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -183,8 +185,9 @@ class GroupMetadataManagerTest {
 
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
-    val records = MemoryRecords.withRecords(startOffset, Seq(groupMetadataRecord, groupMetadataTombstone): _*)
+    val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      Seq(groupMetadataRecord, groupMetadataTombstone): _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -212,8 +215,8 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
-    val records = MemoryRecords.withRecords(startOffset,
+    val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@@ -538,7 +541,7 @@ class GroupMetadataManagerTest {
     EasyMock.reset(partition)
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -549,13 +552,15 @@ class GroupMetadataManagerTest {
     assertTrue(recordsCapture.hasCaptured)
 
     val records = recordsCapture.getValue.records.asScala.toList
+    recordsCapture.getValue.batches.asScala.foreach { batch =>
+      assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
+      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+    }
     assertEquals(1, records.size)
 
     val metadataTombstone = records.head
     assertTrue(metadataTombstone.hasKey)
-    assertTrue(metadataTombstone.hasNullValue)
-    assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
-    assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
+    assertFalse(metadataTombstone.hasValue)
     assertTrue(metadataTombstone.timestamp > 0)
 
     val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -583,7 +588,7 @@ class GroupMetadataManagerTest {
     EasyMock.reset(partition)
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -594,14 +599,16 @@ class GroupMetadataManagerTest {
     assertTrue(recordsCapture.hasCaptured)
 
     val records = recordsCapture.getValue.records.asScala.toList
+    recordsCapture.getValue.batches.asScala.foreach { batch =>
+      assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
+      // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+    }
     assertEquals(1, records.size)
 
     val metadataTombstone = records.head
     assertTrue(metadataTombstone.hasKey)
-    assertTrue(metadataTombstone.hasNullValue)
-    assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic)
-    // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
-    assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType)
+    assertFalse(metadataTombstone.hasValue)
     assertTrue(metadataTombstone.timestamp > 0)
 
     val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
@@ -672,7 +679,7 @@ class GroupMetadataManagerTest {
     assertEquals(2, records.size)
     records.foreach { message =>
       assertTrue(message.hasKey)
-      assertTrue(message.hasNullValue)
+      assertFalse(message.hasValue)
       val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
       assertEquals(groupId, offsetKey.key.group)
       assertEquals("foo", offsetKey.key.topicPartition.topic)
@@ -758,13 +765,13 @@ class GroupMetadataManagerTest {
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(error, 0L, Record.NO_TIMESTAMP)
+          new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1))
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
   }
 
-  private def buildStableGroupRecordWithMember(memberId: String): Record = {
+  private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = {
     val group = new GroupMetadata(groupId)
     group.transitionTo(PreparingRebalance)
     val memberProtocols = List(("roundrobin", Array.emptyByteArray))
@@ -776,13 +783,13 @@ class GroupMetadataManagerTest {
 
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
     val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
-    Record.create(groupMetadataKey, groupMetadataValue)
+    new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val endOffset = startOffset + records.deepEntries.asScala.size
+    val endOffset = startOffset + records.records.asScala.size
     val logMock =  EasyMock.mock(classOf[Log])
     val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
 
@@ -798,12 +805,12 @@ class GroupMetadataManagerTest {
   }
 
   private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
-                                           groupId: String = groupId): Seq[Record] = {
+                                           groupId: String = groupId): Seq[SimpleRecord] = {
     committedOffsets.map { case (topicPartition, offset) =>
       val offsetAndMetadata = OffsetAndMetadata(offset)
       val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
       val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
-      Record.create(offsetCommitKey, offsetCommitValue)
+      new SimpleRecord(offsetCommitKey, offsetCommitValue)
     }.toSeq
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 6a165ed..0b1978c 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -25,7 +25,7 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record.{SimpleRecord, CompressionType, MemoryRecords}
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
 
@@ -57,16 +57,16 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
 
     /* append two messages */
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
-      Record.create("hello".getBytes), Record.create("there".getBytes)))
+      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
 
-    def readMessage(offset: Int) = log.read(offset, 4096).records.shallowEntries.iterator.next().record
+    def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
 
     if (!brokerCompression.equals("producer")) {
       val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
-      assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode.codec, readMessage(0).compressionType.id)
+      assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode.codec, readBatch(0).compressionType.id)
     }
     else
-      assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readMessage(0).compressionType.id)
+      assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode.codec, readBatch(0).compressionType.id)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 6a097f8..f2dbc6e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,11 +20,11 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0, KAFKA_0_11_0_IV0}
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit._
@@ -55,7 +55,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @Test
   def cleanerTest() {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V1)
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
     val maxMessageSize = largeMessageSet.sizeInBytes
 
     cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
@@ -146,7 +146,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @Test
   def testCleanerWithMessageFormatV0(): Unit = {
     val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Record.MAGIC_VALUE_V0)
+    val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
     val maxMessageSize = codec match {
       case CompressionType.NONE => largeMessageSet.sizeInBytes
       case _ =>
@@ -164,7 +164,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)
 
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
     val startSize = log.size
     cleaner.startup()
 
@@ -176,15 +176,16 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     checkLogAfterAppendingDups(log, startSize, appends)
 
     val appends2: Seq[(Int, String, Long)] = {
-      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+      val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
       val appendInfo = log.append(largeMessageSet, assignOffsets = true)
       val largeMessageOffset = appendInfo.firstOffset
 
-      // also add some messages with version 1 to check that we handle mixed format versions correctly
-      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
+      // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
+      props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_11_0_IV0.version)
       log.config = new LogConfig(props)
-      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
-      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1
+      val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+      val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
+      appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
     }
     val firstDirty2 = log.activeSegment.baseOffset
     checkLastCleaned("log", 0, firstDirty2)
@@ -204,15 +205,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     // with compression enabled, these messages will be written as a single message containing
     // all of the individual messages
-    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
-    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V0)
+    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
+    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
 
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_10_0_IV1.version)
     log.config = new LogConfig(props)
 
-    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = Record.MAGIC_VALUE_V1)
+    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
+    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
 
     val appends = appendsV0 ++ appendsV1
 
@@ -251,15 +252,15 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
   private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
     import JavaConverters._
-    for (segment <- log.logSegments; deepLogEntry <- segment.log.deepEntries.asScala) yield {
-      val key = TestUtils.readString(deepLogEntry.record.key).toInt
-      val value = TestUtils.readString(deepLogEntry.record.value)
+    for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
+      val key = TestUtils.readString(deepLogEntry.key).toInt
+      val value = TestUtils.readString(deepLogEntry.value)
       (key, value, deepLogEntry.offset)
     }
   }
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                        startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val value = counter.toString
       val appendInfo = log.append(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
@@ -270,19 +271,18 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   }
 
   private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                                        startKey: Int = 0, magicValue: Byte = Record.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+                                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
       counter += 1
       (key, payload)
     }
 
-    val messages = kvs.map { case (key, payload) =>
-      Record.create(magicValue, key.toString.getBytes, payload.toString.getBytes)
+    val records = kvs.map { case (key, payload) =>
+      new SimpleRecord(key.toString.getBytes, payload.toString.getBytes)
     }
 
-    val records = MemoryRecords.withRecords(codec, messages: _*)
-    val appendInfo = log.append(records, assignOffsets = true)
+    val appendInfo = log.append(MemoryRecords.withRecords(magicValue, codec, records: _*), assignOffsets = true)
     val offsets = appendInfo.firstOffset to appendInfo.lastOffset
 
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index c24cb68..4d8c836 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -106,9 +106,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   private def readFromLog(log: Log): Iterable[(Int, Int)] = {
     import JavaConverters._
 
-    for (segment <- log.logSegments; logEntry <- segment.log.deepEntries.asScala) yield {
-      val key = TestUtils.readString(logEntry.record.key).toInt
-      val value = TestUtils.readString(logEntry.record.value).toInt
+    for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
+      val key = TestUtils.readString(record.key).toInt
+      val value = TestUtils.readString(record.value).toInt
       key -> value
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index b1d2b33..3690c55 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     while(log.numberOfSegments < 8)
-      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
+      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()))
 
     val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
@@ -123,7 +123,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while(log.numberOfSegments < 4)
-      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
 
     val activeSegAtT0 = log.activeSegment
 
@@ -131,7 +131,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val t1 = time.milliseconds
 
     while (log.numberOfSegments < 8)
-      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
+      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t1))
 
     val topicPartition = new TopicPartition("log", 0)
     val lastClean = Map(topicPartition -> 0L)
@@ -155,7 +155,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
 
     val t0 = time.milliseconds
     while (log.numberOfSegments < 8)
-      log.append(logEntries(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+      log.append(records(log.logEndOffset.toInt, log.logEndOffset.toInt, t0))
 
     time.sleep(compactionLag + 1)
 
@@ -192,7 +192,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
-  private def logEntries(key: Int, value: Int, timestamp: Long) =
-    MemoryRecords.withRecords(Record.create(timestamp, key.toString.getBytes, value.toString.getBytes))
+  private def records(key: Int, value: Int, timestamp: Long) =
+    MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index cf1e4cb..18c1bbe 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -31,8 +31,8 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import org.scalatest.junit.JUnitSuite
 
+import scala.collection.JavaConverters._
 import scala.collection._
-import JavaConverters._
 
 /**
  * Unit tests for the log cleaning logic
@@ -231,7 +231,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // the last (active) segment has just one message
 
-    def distinctValuesBySegment = log.logSegments.map(s => s.log.shallowEntries.asScala.map(m => TestUtils.readString(m.record.value)).toSet.size).toSeq
+    def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq
 
     val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
@@ -255,7 +255,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
@@ -273,7 +273,7 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    val messageSet = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
@@ -326,14 +326,14 @@ class LogCleanerTest extends JUnitSuite {
 
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] =
-    log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => TestUtils.readString(m.record.key).toInt))
+    log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(record => TestUtils.readString(record.key).toInt))
 
   /* extract all the offsets from a log */
   def offsetsInLog(log: Log): Iterable[Long] =
-    log.logSegments.flatMap(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).filter(_.record.hasKey).map(m => m.offset))
+    log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset))
 
   def unkeyedMessageCountInLog(log: Log) =
-    log.logSegments.map(s => s.log.shallowEntries.asScala.filter(!_.record.hasNullValue).count(m => !m.record.hasKey)).sum
+    log.logSegments.map(s => s.log.records.asScala.filter(_.hasValue).count(m => !m.hasKey)).sum
 
   def abortCheckDone(topicPartition: TopicPartition): Unit = {
     throw new LogCleaningAbortedException()
@@ -423,7 +423,7 @@ class LogCleanerTest extends JUnitSuite {
     val cleaner = makeCleaner(Int.MaxValue)
 
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 400: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
@@ -433,7 +433,7 @@ class LogCleanerTest extends JUnitSuite {
       log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
 
     // forward offset and append message to next segment at offset Int.MaxValue
-    val records = MemoryRecords.withLogEntries(LogEntry.create(Int.MaxValue - 1, Record.create("hello".getBytes, "hello".getBytes)))
+    val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
     log.append(records, assignOffsets = false)
     log.append(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes))
     assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
@@ -602,8 +602,8 @@ class LogCleanerTest extends JUnitSuite {
   def testBuildOffsetMapFakeLarge(): Unit = {
     val map = new FakeOffsetMap(1000)
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentBytesProp, 120: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 120: java.lang.Integer)
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     val logConfig = LogConfig(logProps)
     val log = makeLog(config = logConfig)
@@ -681,10 +681,10 @@ class LogCleanerTest extends JUnitSuite {
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
 
-    for (segment <- log.logSegments; shallowLogEntry <- segment.log.shallowEntries.asScala; deepLogEntry <- shallowLogEntry.asScala) {
-      assertEquals(shallowLogEntry.record.magic, deepLogEntry.record.magic)
-      val value = TestUtils.readString(deepLogEntry.record.value).toLong
-      assertEquals(deepLogEntry.offset, value)
+    for (segment <- log.logSegments; batch <- segment.log.batches.asScala; record <- batch.asScala) {
+      assertTrue(record.hasMagic(batch.magic))
+      val value = TestUtils.readString(record.value).toLong
+      assertEquals(record.offset, value)
     }
   }
 
@@ -703,9 +703,9 @@ class LogCleanerTest extends JUnitSuite {
     val corruptedMessage = invalidCleanedMessage(offset, set)
     val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
 
-    for (logEntry <- records.deepEntries.asScala) {
+    for (logEntry <- records.records.asScala) {
       val offset = logEntry.offset
-      val value = TestUtils.readString(logEntry.record.value).toLong
+      val value = TestUtils.readString(logEntry.value).toLong
       assertEquals(offset, value)
     }
   }
@@ -729,19 +729,19 @@ class LogCleanerTest extends JUnitSuite {
                                           timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000))
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
-    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
     // Append a message and roll out another log segment.
     log.append(TestUtils.singletonRecords(value = "1".getBytes,
                                           key = "1".getBytes,
                                           timestamp = time.milliseconds()))
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
-    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.shallowEntries.iterator().next().offset())
+    assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
   }
 
   private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
-      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).firstOffset
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = false).lastOffset
   }
 
   private def invalidCleanedMessage(initialOffset: Long,
@@ -751,25 +751,28 @@ class LogCleanerTest extends JUnitSuite {
     // would write invalid compressed message sets with the outer magic set to 1 and the inner
     // magic set to 0
     val records = keysAndValues.map(kv =>
-      Record.create(Record.MAGIC_VALUE_V0,
-        Record.NO_TIMESTAMP,
+      LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0,
+        RecordBatch.NO_TIMESTAMP,
         kv._1.toString.getBytes,
         kv._2.toString.getBytes))
 
     val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
-    val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME)
+    val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME, initialOffset)
 
     var offset = initialOffset
     records.foreach { record =>
-      builder.appendUnchecked(offset, record)
+      builder.appendUncheckedWithOffset(offset, record)
       offset += 1
     }
 
     builder.build()
   }
 
-  private def messageWithOffset(key: Int, value: Int, offset: Long) =
-    MemoryRecords.withLogEntries(LogEntry.create(offset, Record.create(key.toString.getBytes, value.toString.getBytes)))
+  private def messageWithOffset(key: Array[Byte], value: Array[Byte], offset: Long): MemoryRecords =
+    MemoryRecords.withRecords(offset, CompressionType.NONE, new SimpleRecord(key, value))
+
+  private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
+    messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@@ -797,10 +800,10 @@ class LogCleanerTest extends JUnitSuite {
     record(key, value.toString.getBytes)
 
   def record(key: Int, value: Array[Byte]) =
-    MemoryRecords.withRecords(Record.create(key.toString.getBytes, value))
+    TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
 
   def unkeyedRecord(value: Int) =
-    MemoryRecords.withRecords(Record.create(value.toString.getBytes))
+    TestUtils.singletonRecords(value = value.toString.getBytes)
 
   def tombstoneRecord(key: Int) = record(key, null)
 
@@ -832,5 +835,5 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
 
   def latestOffset: Long = lastOffset
 
-  override def toString: String = map.toString()
+  override def toString: String = map.toString
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 1197b02..25b3480 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -17,7 +17,8 @@
  package kafka.log
 
 import kafka.utils.TestUtils
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
+import kafka.utils.TestUtils.checkEquals
+import org.apache.kafka.common.record.{RecordBatch, _}
 import org.apache.kafka.common.utils.Time
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -46,7 +47,8 @@ class LogSegmentTest {
   
   /* create a ByteBufferMessageSet for the given messages starting from the given offset */
   def records(offset: Long, records: String*): MemoryRecords = {
-    MemoryRecords.withRecords(offset, records.map(s => Record.create(Record.MAGIC_VALUE_V1, offset * 10, s.getBytes)):_*)
+    MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, CompressionType.NONE, TimestampType.CREATE_TIME,
+      records.map { s => new SimpleRecord(offset * 10, s.getBytes) }: _*)
   }
 
   @After
@@ -75,9 +77,9 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there", "little", "bee")
-    seg.append(50, 53, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 53, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
-    assertEquals(ms.deepEntries.asScala.toList, read.deepEntries.asScala.toList)
+    checkEquals(ms.records.iterator, read.records.iterator)
   }
 
   /**
@@ -89,10 +91,10 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = records(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, 52, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(baseOffset, 52, RecordBatch.NO_TIMESTAMP, -1L, ms)
     def validate(offset: Long) =
-      assertEquals(ms.deepEntries.asScala.filter(_.offset == offset).toList,
-                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.deepEntries.asScala.toList)
+      assertEquals(ms.records.asScala.filter(_.offset == offset).toList,
+                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.records.asScala.toList)
     validate(50)
     validate(51)
     validate(52)
@@ -105,7 +107,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
@@ -118,11 +120,11 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+    checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
   /**
@@ -135,17 +137,17 @@ class LogSegmentTest {
     var offset = 40
     for (_ <- 0 until 30) {
       val ms1 = records(offset, "hello")
-      seg.append(offset, offset, Record.NO_TIMESTAMP, -1L, ms1)
+      seg.append(offset, offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
       val ms2 = records(offset + 1, "hello")
-      seg.append(offset + 1, offset + 1, Record.NO_TIMESTAMP, -1L, ms2)
+      seg.append(offset + 1, offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
-      assertEquals(List(ms1.deepEntries.iterator.next(), ms2.deepEntries.iterator.next()), read.records.deepEntries.asScala.toList)
+      assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
       val read2 = seg.read(offset, None, 10000)
-      assertEquals(1, read2.records.deepEntries.asScala.size)
-      assertEquals(ms1.deepEntries.iterator.next(), read2.records.deepEntries.iterator.next())
+      assertEquals(1, read2.records.records.asScala.size)
+      checkEquals(ms1.records.iterator, read2.records.records.iterator)
       offset += 1
     }
   }
@@ -175,10 +177,10 @@ class LogSegmentTest {
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
-    seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
     seg.truncateTo(0)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
-    seg.append(40, 41, Record.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
+    seg.append(40, 41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
 
   /**
@@ -214,7 +216,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.nextOffset)
-    seg.append(50, 52, Record.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
+    seg.append(50, 52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
     assertEquals(53, seg.nextOffset())
   }
 
@@ -241,12 +243,12 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+      seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.deepEntries.iterator.next().offset)
+      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
   }
 
   /**
@@ -277,7 +279,7 @@ class LogSegmentTest {
     for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
-        seg.append(i, i, Record.NO_TIMESTAMP, -1L, records(i, i.toString))
+        seg.append(i, i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
 
@@ -285,7 +287,8 @@ class LogSegmentTest {
       val position = recordPosition.position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
       seg.recover(64*1024)
-      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.shallowEntries.asScala.map(_.offset).toList)
+      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList,
+        seg.log.batches.asScala.map(_.lastOffset).toList)
       seg.delete()
     }
   }
@@ -293,7 +296,8 @@ class LogSegmentTest {
   /* create a segment with   pre allocate */
   def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
     val tempDir = TestUtils.tempDir()
-    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
+    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists,
+      initFileSize = initFileSize, preallocate = preallocate)
     segments += seg
     seg
   }
@@ -303,11 +307,11 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+    checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
   /* create a segment with   pre allocate and clearly shut down*/
@@ -317,11 +321,11 @@ class LogSegmentTest {
     val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
 
     val ms = records(50, "hello", "there")
-    seg.append(50, 51, Record.NO_TIMESTAMP, -1L, ms)
+    seg.append(50, 51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
-    seg.append(60, 61, Record.NO_TIMESTAMP, -1L, ms2)
+    seg.append(60, 61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepEntries.asScala.toList, read.records.deepEntries.asScala.toList)
+    checkEquals(ms2.records.iterator, read.records.records.iterator)
     val oldSize = seg.log.sizeInBytes()
     val oldPosition = seg.log.channel.position
     val oldFileSize = seg.log.file.length
@@ -334,7 +338,7 @@ class LogSegmentTest {
     segments += segReopen
 
     val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.deepEntries.asScala.toList, readAgain.records.deepEntries.asScala.toList)
+    checkEquals(ms2.records.iterator, readAgain.records.records.iterator)
     val size = segReopen.log.sizeInBytes()
     val position = segReopen.log.channel.position
     val fileSize = segReopen.log.file.length